我正在使用producer consumer使用
ExecutorService
和
BlockingQueue
. 我有一个交易清单,我想核实并采取行动。我得到一个新的交易,不断核实。
考虑到阻塞队列,我只有一个生产者,我希望保持3-5个并行消费者,以加快验证速度。
我可能要等几笔交易完成。(说30秒)。
所以,我会核实,如果它是假的,时间超过30秒,我会放弃它。基本上,我只想在数据项是可消费的情况下消费。
首先,这种方法好吗?或者我应该尝试一些其他的解决方案(到目前为止我还不知道)
这是我改编的代码
this question
:
import java.util.concurrent.*;
public class ProducerConsumerWithES {
public static void main(String args[]){
BlockingQueue<Integer> sharedQueue = new LinkedBlockingQueue<Integer>();
ExecutorService pes = Executors.newFixedThreadPool(2);
ExecutorService ces = Executors.newFixedThreadPool(2);
pes.submit(new Producer(sharedQueue,1));
pes.submit(new Producer(sharedQueue,2));
ces.submit(new Consumer(sharedQueue,1));
ces.submit(new Consumer(sharedQueue,2));
// shutdown should happen somewhere along with awaitTermination
/* https://stackoverflow.com/questions/36644043/how-to-properly-shutdown-java-executorservice/36644320#36644320 */
pes.shutdown();
ces.shutdown();
}
}
class Producer implements Runnable {
private final BlockingQueue<Integer> sharedQueue;
private int threadNo;
public Producer(BlockingQueue<Integer> sharedQueue,int threadNo) {
this.threadNo = threadNo;
this.sharedQueue = sharedQueue;
}
@Override
public void run() {
for(int i=1; i<= 5; i++){
try {
int number = i+(10*threadNo);
System.out.println("Produced:" + number + ":by thread:"+ threadNo);
sharedQueue.put(number);
} catch (Exception err) {
err.printStackTrace();
}
}
}
}
class Consumer implements Runnable{
private final BlockingQueue<Integer> sharedQueue;
private int threadNo;
public Consumer (BlockingQueue<Integer> sharedQueue,int threadNo) {
this.sharedQueue = sharedQueue;
this.threadNo = threadNo;
}
@Override
public void run() {
while(true){
try {
int num = sharedQueue.take();
System.out.println("Consumed: "+ num + ":by thread:"+threadNo);
} catch (Exception err) {
err.printStackTrace();
}
}
}
}
我知道我可以
peek()
然后
remove()
如果需要的话。
但当我尝试这样做时,所有其他的消费者都会陷入同样的交易。而其他的交易却从未参与过。
这是因为存储正在排队(FIFO)。
当我删除元素时,这种情况永远不会发生,因为其他消费者可以访问其余的元素,所以请执行验证而不是查看。
我的问题是
偷看()
然后
删除()
或
put()
在消费者方面可以吗?