代码之家  ›  专栏  ›  技术社区  ›  Vijay

两个阻塞队列-死锁

  •  2
  • Vijay  · 技术社区  · 15 年前

    我需要原子地操作两个队列,但不确定什么是正确的同步策略:这就是我所尝试的:

    public class transfer {
    
        BlockingQueue firstQ;
        BlockingQueue secondQ;
    
        public moveToSecond() {
            synchronized (this){
                Object a = firstQ.take();
                secondQ.put(a)
            }
        }
    
        public moveToFirst() {
            synchronized(this) {
                Object a = secondQ.take();
                firstQ.put(a);
            }
        }
    }
    

    这是正确的模式吗?在moveToSecond()方法中,如果firstQ为空,则该方法将等待firstQ.take(),但它仍保留此对象的锁。这将阻止moveToFirst()有机会执行。

    4 回复  |  直到 13 年前
        1
  •  2
  •   Adamski    15 年前

    您正在使用正确的方法使用公共互斥锁在两个队列之间进行同步。但是,为了避免您描述的第一个队列为空的情况,我建议重新实现 moveToFirst() moveToSecond() 使用 poll() take() ;例如。

    public void boolean moveToFirst() {
      // Synchronize on simple mutex; could use a Lock here but probably
      // not worth the extra dev. effort.
      synchronzied(queueLock) {
        boolean success;
    
        // Will return immediately, returning null if the queue is empty.
        Object o = firstQ.poll();
    
        if (o != null) {
          // Put could block if the queue is full.  If you're using a bounded
          // queue you could use add(Object) instead to avoid any blocking but
          // you would need to handle the exception somehow.
          secondQ.put(o);
          success = true;
        } else {
          success = false;
        }
      }
    
      return success;
    }
    
        2
  •  1
  •   Seun Osewa    15 年前

    你没有提到的另一个失败条件是,如果firstQ不是空的,但是secondQ是满的,那么这个项目将从firstQ中移除,但是没有地方放置它。

    这是一种乐观的方法;在正常操作中是有效的,但在死锁频繁的情况下是非常低效的(平均延迟取决于所选的超时)

        3
  •  0
  •   JesperE    15 年前

    您应该使用java.util.concurrency中的锁机制,如下所示:

    Lock lock = new ReentrantLock();
    ....
    lock.lock();
    try {
        secondQ.put(firstQ.take());
    } finally {
        lock.unlock();
    }
    

    不再需要在对象类上使用低级wait/notify方法,除非您正在编写新的并发原语。

        4
  •  0
  •   NamshubWriter    15 年前

    在你的代码中,当线程被阻塞时 BlockingQueue.take() this . 直到代码离开同步块或 this.wait() 被称为。

    在这里我假设 moveToFirst() moveToSecond() 应该阻塞,并且您的类控制对队列的所有访问。

    private final BlockingQueue<Object> firstQ = new LinkedBlockingQueue();
    private final Semaphore firstSignal = new Semaphore(0);
    private final BlockingQueue<Object> secondQ = LinkedBlockingQueue();
    private final Semaphore secondSignal = new Semaphore(0);
    
    private final Object monitor = new Object();
    
    public void moveToSecond() {
      int moved = 0;
      while (moved == 0) {
    
        // bock until someone adds to the queue
        firstSignal.aquire();
    
        // attempt to move an item from one queue to another atomically
        synchronized (monitor) {
          moved = firstQ.drainTo(secondQ, 1);
        }
      }
    }
    
    public void putInFirst(Object object) {
      firstQ.put(object);
    
      // notify any blocking threads that the queue has an item
      firstSignal.release();
    }
    

    你会有类似的代码 移动到第一个() putInSecond() while 仅当其他代码可能会从队列中移除项时才需要。如果希望删除队列的方法等待挂起的移动,它应该从信号量中获取一个许可证,并且该信号量应该创建为一个公平的信号量,因此要调用的第一个线程 aquire 将首先发布:

    firstSignal = new Semaphore(0, true);
    

    如果你不想

    1. 让方法在 Runnable 发送给 Executor
    2. 移动到第一个() 使用 BlockingQueue.poll(int, TimeUnit)
    3. 使用 BlockingQueue.drainTo(secondQ, 1) 移动到第一个() 返回布尔值以指示是否成功。

    最后,我质疑是否有必要使这一举动具有原子性。如果多个线程正在从队列中添加或删除,则观察队列将无法判断 移动到第一个()