代码之家  ›  专栏  ›  技术社区  ›  randers

用Java中的生产者和消费者防止有界执行器服务中可能出现的死锁情况

  •  3
  • randers  · 技术社区  · 9 年前

    考虑一下这个示例代码:(我已经简化了很多类,所以它们更容易阅读)

    制片人

    class RandomIntegerProducer implements Callable<Void>
    {
        private final BlockingQueue<? super Integer> queue;
        private final Random random;
    
        /* Boilerplate constructor... */
    
        @Override
        public Void call()
        {
            while (!Thread.interrupted())
            {
                try {
                    TimeUnit.SECONDS.sleep(1);
                    queue.put(random.nextInt());
                } catch (InterruptedException e)
                {
                    Thread.currentThread().interrupt();
                    break;
                }
            }
            return null;
        }
    }
    

    这是一个简单、简明的任务示例,该任务每秒将一个随机数放入队列,并可通过 Thread.interrupt() .

    消费者

    class NumberConsumer implements Callable<Void>
    {
        private final BlockingQueue<? extends Number> queue;
        private final Appendable target;
    
        /* Boilerplate constructor... */
    
        @Override
        public Void call() throws IOException
        {
            while (!Thread.interrupted())
            {
                try {
                    target.append(queue.take().toString());
                } catch (InterruptedException e)
                {
                    Thread.currentThread().interrupt();
                    break;
                }
            }
            return null;
        }
    }
    

    消费者从队列中提取数字并将其打印到指定的 Appendable 。可以通过取消 线程中断() .

    起始代码

    class ProducerConsumerStarter
    {
        /* Notice this is a fixed size (e.g. bounded) executor service */
        private static final ExecutorService SERVICE = Executors.newFixedThreadPool(8);
    
        public static List<Future<Void>> startIntegerProducerConsumer(int producers, int consumers)
        {
            List<Callable<Void>> callables = new ArrayList<>();
            BlockingQueue<Integer> commonQueue = new ArrayBlockingQueue<>(16);
            for (int i = 0; i < producers; i++)
            {
                callables.add(new RandomIntegerProducer(commonQueue, new Random()));
            }
            for (int i = 0; i < consumers; i++)
            {
                callables.add(new NumberConsumer(commonQueue, System.out));
            }
            // Submit them all (in order)
            return callables.stream().map(SERVICE::submit).collect(Collectors.toList());
        }
    }
    

    此实用程序方法将任务提交给有界执行器服务(按顺序-首先是所有生产者,然后是所有消费者)

    失败的客户端代码

    public class FailingExaple {
        @org.junit.Test
        public void deadlockApplication() throws Exception
        {
            List<Future<Void>> futures = ProducerConsumerStarter.startIntegerProducerConsumer(10, 10);
            for (Future<Void> future : futures)
            {
                System.out.println("Getting future");
                future.get();
            }
        }
    }
    

    此示例代码通过死锁该并发程序和任何其他未来的起始代码调用方而使其失败。


    问题是:我如何既能防止我的应用程序在高负载下产生大量的线程(我希望任务被排队),又能防止死锁,只因为执行器被生产者污染了?

    即使这个示例很明显100%都会失败,考虑一个并发程序,在不幸的情况下,它只会让有界的执行器充满生产者,你会遇到同样的一般问题。

    1 回复  |  直到 4 年前
        1
  •  2
  •   dezhik    9 年前

    什么是死锁? Java Documentation

    死锁描述了两个或多个线程被阻塞的情况 永远,等待着对方。

    因此,当第一个线程持有monitor1并试图获取monitor2时,就会发生死锁,而第二个线程持有monitor2并试图获取monitor1。
    代码中没有死锁,因为没有 two or more threads .. waiting for each other 。有生产者在队列中等待空间,没有消费者,因为由于执行者的线程数,他们没有被调度。

    而且 “失败的客户端代码” 将始终阻塞线程,即使 startIntegerProducerConsumer(1,1)

    public class FailingExaple {
        @org.junit.Test
        public void deadlockApplication() throws Exception
        {
            List<Future<Void>> futures = ProducerConsumerStarter.startIntegerProducerConsumer(10, 10);
            for (Future<Void> future : futures)
            {
                System.out.println("Getting future");
                future.get();
            }
        }
    }
    

    因为你的生产者和消费者一直在运行,直到出现明显的中断,这在 deadlockApplication() .

    您的代码应该如下所示

    for (Future<Void> future : futures)
    {
        if (future.isDone()) {
            try {
                System.out.println("Getting future");
                future.get();
            } catch (CancellationException ce) {
    
            } catch (ExecutionException ee) {
    
            }
        } else {
            System.out.println("The future is not done, cancelling it");
            if (future.cancel(true)) {
                System.out.println("task was cancelled");
            } else {
                //handle case when FutureTask#cancel(boolean mayInterruptIfRunning) wasn't cancelled
            }
        }
    }
    

    此循环将获得已完成任务的结果,并取消未完成的任务。

    @瓦诺克尔 没错,最好有两个线程池,一个用于消费者,另一个用于生产者。
    这样地

    class ProducerConsumerStarter
    {
        private static final ExecutorService CONSUMERS = Executors.newFixedThreadPool(8); 
        private static final ExecutorService PRODUCERS = Executors.newFixedThreadPool(8); 
    
        public static List<Future<Void>> startIntegerProducerConsumer(int producers, int consumers) {
            ...
        }
    }
    

    startIntegerProducerConsumer(int, int) 其相应地提交消费者和生产者。
    但是在这种情况下,新的任务将被排队,直到之前提交的生产者和消费者完成后才开始(如果这些任务不被中断,则不会发生)。

    您还可以进一步优化生产者的代码。首次更改代码

    class RandomIntegerProducer implements Runnable
    {
        private final BlockingQueue<? super Integer> queue;
        private final Random random;
        ...
        @Override
        public void run()
        {
            queue.offer(random.nextInt());
        }
    }
    

    然后开始将生产者提交到 ScheduledExecutorService 使用 scheduleWithFixedDelay(producer, 1, 1, TimeUnit.SECONDS) 。这一变化将有助于保持生产商的运行,而不会相互阻碍。但它也会稍微改变应用程序的语义。
    你可以保留 ScheduledExecutorService (对于生产者)初始化为类变量。唯一的不便是您必须更改退货类型 startIntegerProducerConsumer(int producers, int consumers) 方法到 List<Future<?>> 但实际上 ScheduledFutures<?> 返回,返回 scheduleWithFixedDelay(..) 仍然是类型 Future<Void> . 如果可能的话,您可以在消费新生成的号码期间对消费者执行相同的操作,最大延迟等于 delay (传递给 scheduleWithFixedDelay() )对你来说很好。

    我希望我的回答能帮一点忙。