代码之家  ›  专栏  ›  技术社区  ›  Adamski

灵活的计数下载?

  •  23
  • Adamski  · 技术社区  · 15 年前

    我已经遇到过两次问题,一个生产者线程产生n个工作项,并将它们提交给 ExecutorService 然后需要等待,直到处理完所有n个项目。

    告诫

    • N事先不知道 . 如果是的话,我会简单地创建一个 CountDownLatch 然后有生产线 await() 直到所有工作完成。
    • 使用A CompletionService 不合适,因为尽管我的生产者线程需要阻塞(即通过调用 take() 没有办法表明所有工作都已完成 ,以使生产者线程停止等待。

    我目前最喜欢的解决方案是使用整数计数器, 增量 每当提交一项工作时, 减量 当处理工作项时。在所有n个任务的子任务之后,我的生产者线程将需要等待一个锁,检查是否 counter == 0 无论何时通知。如果消费线程已递减计数器且新值为0,则需要通知生产者。

    有没有更好的方法来解决这个问题,或者在 java.util.concurrent 我应该使用而不是“自己滚动”?

    事先谢谢。

    6 回复  |  直到 6 年前
        1
  •  25
  •   Ravindra babu    7 年前

    java.util.concurrent.Phaser 看起来对你很有用。它计划在Java 7中发布,但可以找到最稳定的版本。 jsr166 的兴趣小组网站。

    相位器是一个光荣的循环屏障。您可以注册n个参与方,当您准备好等待他们在特定阶段的前进时。

    关于它如何工作的一个简单示例:

    final Phaser phaser = new Phaser();
    
    public Runnable getRunnable(){
        return new Runnable(){
            public void run(){
                ..do stuff...
                phaser.arriveAndDeregister();
            }
        };
    }
    public void doWork(){
        phaser.register();//register self
        for(int i=0 ; i < N; i++){
            phaser.register(); // register this task prior to execution 
            executor.submit( getRunnable());
        }
        phaser.arriveAndAwaitAdvance();
    }
    
        2
  •  2
  •   oxbow_lakes    15 年前

    你当然可以用 CountDownLatch 受保护 AtomicReference 这样,您的任务就可以打包为:

    public class MyTask extends Runnable {
        private final Runnable r;
        public MyTask(Runnable r, AtomicReference<CountDownLatch> l) { this.r = r; }
    
        public void run() {
            r.run();
            while (l.get() == null) Thread.sleep(1000L); //handle Interrupted
            l.get().countDown();
        }
    }
    

    通知 任务运行他们的工作,然后 旋转 直到设置了倒计时(即知道任务总数)。一旦设置了倒计时,他们就倒计时并退出。提交如下:

    AtomicReference<CountDownLatch> l = new AtomicReference<CountDownLatch>();
    executor.submit(new MyTask(r, l));
    

    你作品的创作/提交点, 当你知道你已经创建了多少任务时 以下内容:

    latch.set(new CountDownLatch(nTasks));
    latch.get().await();
    
        3
  •  1
  •   Avi    15 年前

    我使用了一个ExecutionCompletionService来实现以下功能:

    ExecutorCompletionService executor = ...;
    int count = 0;
    while (...) {
        executor.submit(new Processor());
        count++;
    }
    
    //Now, pull the futures out of the queue:
    for (int i = 0; i < count; i++) {
        executor.take().get();
    }
    

    这涉及到保持已提交任务的队列,因此如果您的列表任意长,您的方法可能更好。

    但一定要使用 AtomicInteger 对于协调,这样您就可以在一个线程中增加它,在工作线程中减少它。

        4
  •  0
  •   rsp    15 年前

    我假设您的生产者不需要知道队列何时是空的,但需要知道最后一个任务何时完成。

    我会添加一个 waitforWorkDone(producer) 方法。生产者可以添加其n个任务并调用wait方法。wait方法在工作队列不为空且当前没有执行任何任务时阻止传入线程。

    消费者线程 notifyAll() 在waitfor锁上,如果其任务已完成,则队列为空,并且没有执行其他任务。

        5
  •  0
  •   Mumrah    7 年前

    您所描述的与使用标准信号量非常相似,但使用的是“向后”。

    • 您的信号以0个许可开始
    • 每个作业单元完成后,发放一份许可证。
    • 你通过等待获得N个许可证来阻止

    此外,您还可以灵活地只获取M<N许可证,如果您想检查中间状态,这很有用。例如,我正在测试一个异步绑定的消息队列,因此我希望该队列对于某些m<n是满的,因此我可以获取m并检查该队列是否确实满了,然后在从队列中消费消息之后获取剩余的n-m许可证。

        6
  •  0
  •   drekbour    6 年前

    一个独立的Java 8 +方法 Stream 使用单程 Phaser . Iterator / Iterable 变体完全相同。

    public static <X> int submitAndWait(Stream<X> items, Consumer<X> handleItem, Executor executor) {
        Phaser phaser = new Phaser(1); // 1 = register ourselves
        items.forEach(item -> {
                phaser.register(); // new task
                executor.execute(() -> {
                    handleItem.accept(item);
                    phaser.arrive(); // completed task
                });
        });
        phaser.arriveAndAwaitAdvance(); // block until all tasks are complete
        return phaser.getRegisteredParties()-1; // number of items
    }
    ...
    int recognised = submitAndWait(facesInPicture, this::detectFace, executor)
    

    FYI。对于奇异事件,这是可以的,但如果同时调用它,则涉及 ForkJoinPool 将阻止父线程阻塞。