代码之家  ›  专栏  ›  技术社区  ›  M06H

使用BlockingQueue实现java异步服务

  •  0
  • M06H  · 技术社区  · 4 年前

    我试图在现有同步版本的基础上编写自己的异步服务实现。

    到目前为止,我有以下几点:

    @Service("asynchronousProcessor")
    public class AsynchronousProcessor extends Processor {
    
       private BlockingQueue<Pair<String, MyRequest>> requestQueue = new LinkedBlockingQueue<>();
    
        public AsynchronousProcessor(final PBRequestRepository pbRequestRepository,
                                         final JobRunner jobRunner) {
            super(pbRequestRepository, jobRunner);
        }
    
        @Override
        public MyResponse process(MyRequest request, String id) {
            super.saveTheRequestInDB(request);
            // add task to blocking queue and have it processed in the background
        }
    
    }
    

    基本上我有一个endpoint RestController类 process() 。异步版本应将请求排入BlockingQueue,并在后台进行处理。

    我不确定如何实现这段代码来解决这个问题。我是否应该使用 ExecutorService 以及如何最好地适应当前的设计。

    在执行任务之前或执行任务调用之后使用一些控件是很有用的。

    任何带有一些代码示例的答案都会非常有帮助:)

    0 回复  |  直到 4 年前
        1
  •  0
  •   Avnish    4 年前

    如果唯一的要求是异步处理,那么我强烈建议考虑使用 spring inbuilt @Async 为此目的。然而,使用这种方法将与处理器的现有处理方法不兼容,因为返回类型必须是void或包装在Future类型中。这种限制是有充分理由的,因为异步执行不能立即返回响应,因此未来的包装器是在需要时访问结果的唯一方法。

    下面的解决方案大纲列出了在保持接口兼容性的同时,从同步执行切换到异步执行应该做什么。所有重要的观点都用内联注释提及。请注意,尽管这与接口兼容,但返回类型为null(出于上述原因)。如果您必须在控制器中使用返回值,那么这种方法(或任何异步方法)将不起作用,除非您也切换到异步控制器(这是一个具有更广泛更改和设计的不同主题)。以下概述还包括执行前和执行后挂钩。

    /**
     * Base interface extracted from existing Processor.
     * Use this interfae as injection type in the controller along 
     * with @Qualifier("synchProcessor") for using sync processor.
     * Once ready, switch the Qualifier to asynchronousProcessor
     * to start using async instead.
     */
    public interface BaseProcessor {
        public MyResponse process(MyRequest request, String id);
    }
    
    @Service("synchProcessor")
    @Primary
    public class Processor implements BaseProcessor {
        @Override
        public MyResponse process(MyRequest request, String id) {
            // normal existing sync logic
        }
    }
    
    @Service("asynchronousProcessor")
    public class AsynchronousProcessor implements BaseProcessor {
        @Autowired
        private AsynchQueue queue;
        
            
        public MyResponse process(MyRequest request, String id) {
            queue.process(request,id);
            // async execution can not return result immediately
            // this is a hack to have this implementation interface 
            // compatible with existing BaseProcessor
            return null; 
        }
    }
    
    @Component
    public class AsynchQueue {
        @Autowired
        @Qualifier("synchProcessor")
        private BaseProcessor processor;
    
        /**
         * This method will be scheduled by spring scheduler and executd 
         * asynchronously using an executor. Presented outline will
         * call preProcess and postProcess methods before actual method
         * execution. Actual method execution is delegated to existing
         * synchProcessor resuing it 100% AS-IS.
         */
        @Override
        @Async
        public void process(MyRequest request, String id) {
            preProcess(request, id);
            MyResponse response = processor.process(request, id);
            postProcess(request, id, response);
        }
        
        private void preProcess(MyRequest request, String id) {
            // add logic for pre processing here
        }
        
        private void postProcess(MyRequest request, String id, MyResponse response) {
            // add logic for post processing here
        }
    
    }
    

    另一个用例可能是批量处理数据库更新,而不是像您已经在做的那样逐个处理它们。如果您的数据量很大,而且数据库更新正在成为瓶颈,这一点尤其有用。在这种情况下,使用阻塞队列是有意义的。以下是可用于此目的的解决方案大纲。同样,尽管这与接口兼容,但返回类型仍然为null。如果批处理需要多个处理线程(或者spring executor),您可以进一步微调这个大纲。对于一个类似的用例,一个带有批量更新的单处理线程就足以满足我的需求,并发db更新由于并发执行中的db级别锁定而带来了更大的问题。

    public class MyRequestAndID {
        private MyRequest request;
        prviate String id;
        
        public MyRequestAndID(MyRequest request, String id){
            this.request = request;
            this.id = id;
        }
        
        public MyRequest getMyRequest() {
            return this.request;
        }
        
        public String MyId() {
            return this.id;
        }
    }
    
    @Service("asynchronousProcessor")
    public class BatchProcessorQueue implements BaseProcessor{
        /* Batch processor which can process one OR more items using a single DB query */
        @Autowired
        private BatchProcessor batchProcessor;
    
        private LinkedBlockingQueue<MyRequestAndID> inQueue = new LinkedBlockingQueue<>();
    
        private Set<MyRequestAndID> processingSet = new HashSet<>();
    
        @PostConstruct
        private void init() {
            Thread processingThread = new Thread(() -> processQueue());
            processingThread.setName("BatchProcessor");
            processingThread.start();
        }
        
        public MyResponse process(MyRequest request, String id) {
            enqueu(new MyRequestAndID(request, id));
            // async execution can not return result immediately
            // this is a hack to have this implementation interface 
            // compatible with existing BaseProcessor
            return null; 
        }
    
        public void enqueu(MyRequestAndID job) {
            inQueue.add(job);
        }
    
        private void processQueue() {
            try {
                while (true) {
                    processQueueCycle();
                }
            } catch (InterruptedException ioex) {
                logger.error("Interrupted while processing queue", ioex);
            }
        }
    
        private void processQueueCycle() throws InterruptedException {
            // blocking call, wait for at least one item
            MyRequestAndID job = inQueue.take();
            processingSet.add(job);
            updateSetFromQueue();
            processSet();
        }
    
        private void processSet() {
            if (processingSet.size() < 1)
                return;
            int qSize = processingSet.size();
            preProcess(processingSet)
            batchProcessor.processAll(processingSet);
            postProcess(processingSet)
            processingSet.clear();
        }
    
        private void updateSetFromQueue() {
            List<MyRequestAndID> inData = Arrays.asList(inQueue.toArray(new MyRequestAndID[0]));
            if (inData.size() < 1)
                return;
            inQueue.removeAll(inData);
            processingSet.addAll(inData);
        }
        
        private void preProcess(Set<MyRequestAndID> currentSet) {
            // add logic for pre processing here
        }
        
        private void postProcess(Set<MyRequestAndID> currentSet) {
            // add logic for post processing here
        }
    }