代码之家  ›  专栏  ›  技术社区  ›  manuel aldana

ExecutorService,避免任务队列过满的标准方法

  •  27
  • manuel aldana  · 技术社区  · 14 年前

    我正在使用 ExecutorService

    while(xxx) {
        ExecutorService exService = Executors.newFixedThreadPool(NUMBER_THREADS);
        ...  
        Future<..> ... = exService.submit(..);
        ...
    }
    

    我的问题是 submit() NUMBER_THREADS 都被占用了。结果是任务队列被许多任务淹没。其结果是,关闭执行服务时 ExecutorService.shutdown() 需要很长时间( ExecutorService.isTerminated()

    目前,我的解决方法是使用信号量来禁止在的任务队列中有太多条目 :

    ...
    Semaphore semaphore=new Semaphore(NUMBER_THREADS);
    
    while(xxx) {
        ExecutorService exService = Executors.newFixedThreadPool(NUMBER_THREADS); 
        ...
        semaphore.aquire();  
        // internally the task calls a finish callback, which invokes semaphore.release()
        // -> now another task is added to queue
        Future<..> ... = exService.submit(..); 
        ...
    }
    

    5 回复  |  直到 6 年前
        1
  •  31
  •   emanciperingsivraren Adam Gent    4 年前

    诀窍是使用固定的队列大小和:

    new ThreadPoolExecutor.CallerRunsPolicy()
    

    ListeningExecutorService .

    private ListeningExecutorService producerExecutorService = MoreExecutors.listeningDecorator(newFixedThreadPoolWithQueueSize(5, 20));
    private ListeningExecutorService consumerExecutorService = MoreExecutors.listeningDecorator(newFixedThreadPoolWithQueueSize(5, 20));
    
    private static ExecutorService newFixedThreadPoolWithQueueSize(int nThreads, int queueSize) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      5000L, TimeUnit.MILLISECONDS,
                                      new ArrayBlockingQueue<Runnable>(queueSize, true), new ThreadPoolExecutor.CallerRunsPolicy());
    }
    

    任何更好的方法,您都可以考虑像RabbitMQ或ActiveMQ这样的MQ,因为它们有QoS技术。

        2
  •  7
  •   HugoTeixeira    6 年前

    你可以打电话 ThreadPoolExecutor.getQueue().size() 找出等待队列的大小。如果队列太长,可以执行操作。如果队列太长而不能减慢生产者的速度(如果合适的话),我建议在当前线程中运行任务。

        3
  •  6
  •   mdma    14 年前

    一个真正的阻塞ThreadPoolExecutor已经出现在许多人的愿望清单上,甚至有一个JDC bug在它上面打开。 我也面临同样的问题,遇到了这样一个问题: http://today.java.net/pub/a/today/2008/10/23/creating-a-notifying-blocking-thread-pool-executor.html

    它是BlockingThreadPoolExecutor的一个实现,使用RejectionPolicy实现,该策略使用offer将任务添加到队列中,等待队列有空间。看起来不错。

        4
  •  3
  •   Kevin    14 年前

    你最好创造一个 ThreadPoolExecutor

    在构造函数中,您可以传入一个BlockingQueue,让执行器用作其任务队列。如果传入一个大小受限的BlockingQueue(如 LinkedBlockingQueue ),应该达到你想要的效果。

    ExecutorService exService = new ThreadPoolExecutor(NUMBER_THREADS, NUMBER_THREADS, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(workQueueSize));
    
        5
  •  2
  •   bilal    13 年前

    在executor之前,您放置()并在何时完成任务achive take()。take()必须在任务代码中

        6
  •  1
  •   Yoda    3 年前

    我知道这是太老了,但可能是有用的其他开发人员。所以提交一个 solution .

    因为你要求更好的封装解决方案。它是通过扩展ThreadPoolExecutor和重写submit方法来完成的。

    使用信号量实现的BoundedThreadpoolExecutor。当任务队列已满时,Java executor服务抛出RejectedExecutionException。使用无界队列可能会导致内存不足错误。这可以通过使用executor服务控制提交的任务数来避免。这可以通过使用信号量或实现RejectedExecutionHandler来实现。