代码之家  ›  专栏  ›  技术社区  ›  impossible

我的消费者能(显然)成为少数情况下的生产者吗

  •  0
  • impossible  · 技术社区  · 5 年前

    我正在使用producer consumer使用 ExecutorService BlockingQueue . 我有一个交易清单,我想核实并采取行动。我得到一个新的交易,不断核实。 考虑到阻塞队列,我只有一个生产者,我希望保持3-5个并行消费者,以加快验证速度。

    我可能要等几笔交易完成。(说30秒)。 所以,我会核实,如果它是假的,时间超过30秒,我会放弃它。基本上,我只想在数据项是可消费的情况下消费。

    首先,这种方法好吗?或者我应该尝试一些其他的解决方案(到目前为止我还不知道)

    这是我改编的代码 this question :

    import java.util.concurrent.*;
    
    public class ProducerConsumerWithES {
    
        public static void main(String args[]){
    
            BlockingQueue<Integer> sharedQueue = new LinkedBlockingQueue<Integer>();
    
            ExecutorService pes = Executors.newFixedThreadPool(2);
            ExecutorService ces = Executors.newFixedThreadPool(2);
    
            pes.submit(new Producer(sharedQueue,1));
            pes.submit(new Producer(sharedQueue,2));
    
            ces.submit(new Consumer(sharedQueue,1));
            ces.submit(new Consumer(sharedQueue,2));
            // shutdown should happen somewhere along with awaitTermination
            /* https://stackoverflow.com/questions/36644043/how-to-properly-shutdown-java-executorservice/36644320#36644320 */
            pes.shutdown();
            ces.shutdown();
        }
    }
    class Producer implements Runnable {
    
        private final BlockingQueue<Integer> sharedQueue;
        private int threadNo;
    
        public Producer(BlockingQueue<Integer> sharedQueue,int threadNo) {
            this.threadNo = threadNo;
            this.sharedQueue = sharedQueue;
        }
    
        @Override
        public void run() {
            for(int i=1; i<= 5; i++){
                try {
                    int number = i+(10*threadNo);
                    System.out.println("Produced:" + number + ":by thread:"+ threadNo);
                    sharedQueue.put(number);
                } catch (Exception err) {
                    err.printStackTrace();
                }
            }
        }
    }
    
    class Consumer implements Runnable{
    
        private final BlockingQueue<Integer> sharedQueue;
        private int threadNo;
    
        public Consumer (BlockingQueue<Integer> sharedQueue,int threadNo) {
            this.sharedQueue = sharedQueue;
            this.threadNo = threadNo;
        }
        @Override
        public void run() {
    
            while(true){
                try {
                    int num = sharedQueue.take();
                    System.out.println("Consumed: "+ num + ":by thread:"+threadNo);
                } catch (Exception err) {
                    err.printStackTrace();
                }
            }
        }
    }
    

    我知道我可以 peek() 然后 remove() 如果需要的话。 但当我尝试这样做时,所有其他的消费者都会陷入同样的交易。而其他的交易却从未参与过。 这是因为存储正在排队(FIFO)。

    当我删除元素时,这种情况永远不会发生,因为其他消费者可以访问其余的元素,所以请执行验证而不是查看。

    我的问题是 偷看() 然后 删除() put() 在消费者方面可以吗?

    0 回复  |  直到 5 年前
        1
  •  0
  •   user158037    5 年前

    不,正在做 peek() + remove() 是不安全的。所有消费者都有可能 peek 相同值但删除不同元素。要么接受元素并将元素放回队列(在队列的末尾),要么更早地筛选元素并将它们放在单独的队列中。