如果唯一的要求是异步处理,那么我强烈建议考虑使用
spring inbuilt @Async
为此目的。然而,使用这种方法将与处理器的现有处理方法不兼容,因为返回类型必须是void或包装在Future类型中。这种限制是有充分理由的,因为异步执行不能立即返回响应,因此未来的包装器是在需要时访问结果的唯一方法。
下面的解决方案大纲列出了在保持接口兼容性的同时,从同步执行切换到异步执行应该做什么。所有重要的观点都用内联注释提及。请注意,尽管这与接口兼容,但返回类型为null(出于上述原因)。如果您必须在控制器中使用返回值,那么这种方法(或任何异步方法)将不起作用,除非您也切换到异步控制器(这是一个具有更广泛更改和设计的不同主题)。以下概述还包括执行前和执行后挂钩。
/**
* Base interface extracted from existing Processor.
* Use this interfae as injection type in the controller along
* with @Qualifier("synchProcessor") for using sync processor.
* Once ready, switch the Qualifier to asynchronousProcessor
* to start using async instead.
*/
public interface BaseProcessor {
public MyResponse process(MyRequest request, String id);
}
@Service("synchProcessor")
@Primary
public class Processor implements BaseProcessor {
@Override
public MyResponse process(MyRequest request, String id) {
// normal existing sync logic
}
}
@Service("asynchronousProcessor")
public class AsynchronousProcessor implements BaseProcessor {
@Autowired
private AsynchQueue queue;
public MyResponse process(MyRequest request, String id) {
queue.process(request,id);
// async execution can not return result immediately
// this is a hack to have this implementation interface
// compatible with existing BaseProcessor
return null;
}
}
@Component
public class AsynchQueue {
@Autowired
@Qualifier("synchProcessor")
private BaseProcessor processor;
/**
* This method will be scheduled by spring scheduler and executd
* asynchronously using an executor. Presented outline will
* call preProcess and postProcess methods before actual method
* execution. Actual method execution is delegated to existing
* synchProcessor resuing it 100% AS-IS.
*/
@Override
@Async
public void process(MyRequest request, String id) {
preProcess(request, id);
MyResponse response = processor.process(request, id);
postProcess(request, id, response);
}
private void preProcess(MyRequest request, String id) {
// add logic for pre processing here
}
private void postProcess(MyRequest request, String id, MyResponse response) {
// add logic for post processing here
}
}
另一个用例可能是批量处理数据库更新,而不是像您已经在做的那样逐个处理它们。如果您的数据量很大,而且数据库更新正在成为瓶颈,这一点尤其有用。在这种情况下,使用阻塞队列是有意义的。以下是可用于此目的的解决方案大纲。同样,尽管这与接口兼容,但返回类型仍然为null。如果批处理需要多个处理线程(或者spring executor),您可以进一步微调这个大纲。对于一个类似的用例,一个带有批量更新的单处理线程就足以满足我的需求,并发db更新由于并发执行中的db级别锁定而带来了更大的问题。
public class MyRequestAndID {
private MyRequest request;
prviate String id;
public MyRequestAndID(MyRequest request, String id){
this.request = request;
this.id = id;
}
public MyRequest getMyRequest() {
return this.request;
}
public String MyId() {
return this.id;
}
}
@Service("asynchronousProcessor")
public class BatchProcessorQueue implements BaseProcessor{
/* Batch processor which can process one OR more items using a single DB query */
@Autowired
private BatchProcessor batchProcessor;
private LinkedBlockingQueue<MyRequestAndID> inQueue = new LinkedBlockingQueue<>();
private Set<MyRequestAndID> processingSet = new HashSet<>();
@PostConstruct
private void init() {
Thread processingThread = new Thread(() -> processQueue());
processingThread.setName("BatchProcessor");
processingThread.start();
}
public MyResponse process(MyRequest request, String id) {
enqueu(new MyRequestAndID(request, id));
// async execution can not return result immediately
// this is a hack to have this implementation interface
// compatible with existing BaseProcessor
return null;
}
public void enqueu(MyRequestAndID job) {
inQueue.add(job);
}
private void processQueue() {
try {
while (true) {
processQueueCycle();
}
} catch (InterruptedException ioex) {
logger.error("Interrupted while processing queue", ioex);
}
}
private void processQueueCycle() throws InterruptedException {
// blocking call, wait for at least one item
MyRequestAndID job = inQueue.take();
processingSet.add(job);
updateSetFromQueue();
processSet();
}
private void processSet() {
if (processingSet.size() < 1)
return;
int qSize = processingSet.size();
preProcess(processingSet)
batchProcessor.processAll(processingSet);
postProcess(processingSet)
processingSet.clear();
}
private void updateSetFromQueue() {
List<MyRequestAndID> inData = Arrays.asList(inQueue.toArray(new MyRequestAndID[0]));
if (inData.size() < 1)
return;
inQueue.removeAll(inData);
processingSet.addAll(inData);
}
private void preProcess(Set<MyRequestAndID> currentSet) {
// add logic for pre processing here
}
private void postProcess(Set<MyRequestAndID> currentSet) {
// add logic for post processing here
}
}