代码之家  ›  专栏  ›  技术社区  ›  Vidar S. Ramdal

与Java线程的双向通信

  •  2
  • Vidar S. Ramdal  · 技术社区  · 14 年前

    在我的应用程序中,我正在执行一些繁重的查找操作。这些操作必须在单个线程内完成(持久性框架限制)。

    我要缓存结果。因此,我有一个乌姆卡车班,里面有一个工人:

    public class UMRCache {
      private Worker worker;
      private List<String> requests = Collections.synchronizedList<new ArrayList<String>>());
      private Map<String, Object> cache = Collections.synchronizedMap(new HashMap<String, Object>());
      public UMRCache(Repository repository) {
        this.worker = new Worker(repository);
        this.worker.start();
      }
    
     public Object get(String key) {
       if (this.cache.containsKey(key)) {
        // If the element is already cached, get value from cache
         return this.cache.get(key);
       }
       synchronized (this.requests) {
         // Add request to queue
         this.requests.add(key);
         // Notify the Worker thread that there's work to do
         this.requests.notifyAll();
       }
       synchronized (this.cache) {
         // Wait until Worker has updated the cache
         this.cache.wait();
        // Now, cache should contain a value for key
         return this.cache.get(key);
       }
     }
    
     private class Worker extends Thread {
       public void run() {
          boolean doRun = true;
          while (doRun) {
             synchronized (requests) {
                while (requests.isEmpty() && doRun) {
                   requests.wait(); // Wait until there's work to do
                }
                synchronized (cache) {
                   Set<String> processed = new HashSet<String>();
                   for (String key : requests) {
                     // Do the lookup
                     Object result = respository.lookup(key);
                     // Save to cache
                     cache.put(key, result);
                     processed.add(key); 
                   }
                   // Remove processed requests from queue
                   requests.removeAll(processed);
                   // Notify all threads waiting for their requests to be served
                   cache.notifyAll();
                } 
             }
          }
       }
    }
    

    }

    我有一个测试案例: 公共类umrcachetest扩展了测试用例{ 私人umrcache umrcache;

    public void setUp() throws Exception {
        super.setUp();
        umrCache = new UMRCache(repository);
    }
    
    public void testGet() throws Exception {
        for (int i = 0; i < 10000; i++) {
            final List fetched = Collections.synchronizedList(new ArrayList());
            final String[] keys = new String[]{"key1", "key2"};
            final String[] expected = new String[]{"result1", "result2"}
            final Random random = new Random();
    
            Runnable run1 = new Runnable() {
                public void run() {
                    for (int i = 0; i < keys.length; i++) {
                        final String key = keys[i];
                        final Object result = umrCache.get(key);
                        assertEquals(key, results[i]);
                        fetched.add(um);
                        try {
                            Thread.sleep(random.nextInt(3));
                        } catch (InterruptedException ignore) {
                        }
                    }
                }
            };
            Runnable run2 = new Runnable() {
                public void run() {
                    for (int i = keys.length - 1; i >= 0; i--) {
                        final String key = keys[i];
                        final String result = umrCache.get(key);
                        assertEquals(key, results[i]);
                        fetched.add(um);
                        try {
                            Thread.sleep(random.nextInt(3));
                        } catch (InterruptedException ignore) {
                        }
                    }
                }
            };
    
            final Thread thread1 = new Thread(run1);
            thread1.start();
            final Thread thread2 = new Thread(run2);
            thread2.start();
            final Thread thread3 = new Thread(run1);
            thread3.start();
            thread1.join();
            thread2.join();
            thread3.join();
            umrCache.dispose();
            assertEquals(6, fetched.size());
        }
    }
    

    }

    测试随机失败,大约10次测试中有1次失败。它将在最后一个断言时失败:断言等于(6,fetched.size()),断言等于(key,results[i]),有时测试运行程序将永远不会完成。

    所以我的线程逻辑有点问题。有什么小窍门吗?

    编辑:

    多亏了所有的帮助,我现在可能已经破解了。 解决方案似乎是:

     public Object get(String key) {
       if (this.cache.containsKey(key)) {
        // If the element is already cached, get value from cache
         return this.cache.get(key);
       }
       synchronized (this.requests) {
         // Add request to queue
         this.requests.add(key);
         // Notify the Worker thread that there's work to do
         this.requests.notifyAll();
       }
       synchronized (this.cache) {
         // Wait until Worker has updated the cache
         while (!this.cache.containsKey(key)) {
           this.cache.wait();
         }
        // Now, cache should contain a value for key
         return this.cache.get(key);
       }
     }
    
    3 回复  |  直到 14 年前
        1
  •  2
  •   maxim_ge    14 年前

    get()方法逻辑可能会错过结果并卡住

    
    
       synchronized (this.requests) {
         // Add request to queue
         this.requests.add(key);
         // Notify the Worker thread that there's work to do
         this.requests.notifyAll();
       }
    
       // ----- MOMENT1.  If at this moment Worker puts result into cache it
       // will be missed since notification will be lost
    
       synchronized (this.cache) {
         // Wait until Worker has updated the cache
         this.cache.wait();
    
        // ----- MOMENT2.  May be too late, since cache notifiation happened before at MOMENT1
    
        // Now, cache should contain a value for key
         return this.cache.get(key);
       }
    
        2
  •  1
  •   djna    14 年前

    变量 取来 在您的测试中是一个arraylist,从您的两个匿名可运行实例中访问和更新。

    arraylist不是线程安全的,来自以下文档:

    请注意,此实现不是 同步的。如果多线程 访问ArrayList实例 同时,和至少一个 线程修改列表 在结构上,它必须是同步的 在外部。(结构修改 是添加或删除的任何操作 一个或多个元素,或显式 调整支持数组的大小;仅 设置元素的值不是 结构上的修改。)这是 通常由 在某个对象上同步 自然地封装列表。如果没有 这样的对象存在,列表应该 “包装”使用 collections.synchronizedList方法。 最好在创建时完成,以 防止意外不同步 访问列表:

    因此,我认为你的测试需要一些调整。

        3
  •  1
  •   Victor Sorokin    14 年前

    我注意到您在缓存中的查找不是原子操作:

    if (this.cache.containsKey(key)) {
        // If the element is already cached, get value from cache
        return this.cache.get(key);
    }
    

    由于代码中从未从缓存中删除,因此此代码始终会获取一些值。但是,如果将来计划清除缓存,那么这里的原子性不足将成为一个问题。