我有一个微服务,它使用
ReactiveMongoRepository
接口。
目标是获取这些对象中的每一个,并将其推送到一个AWS lambda函数(在将其转换为DTO之后)。如果lambda函数的结果在200范围内,则将对象标记为成功,否则忽略。
在一个简单的MongoRepository和一个restTemplate的旧时代,这将是一个微不足道的任务。然而,我试图理解这种被动的交易,并避免阻塞。
这是我编出来的代码,我知道我在阻止
webClient
但我该如何避免呢?
@Override
public Flux<Video> index() {
return videoRepository.findAllByIndexedIsFalse().flatMap(video -> {
final SearchDTO searchDTO = SearchDTO.builder()
.name(video.getName())
.canonicalPath(video.getCanonicalPath())
.objectID(video.getObjectID())
.userId(video.getUserId())
.build();
// Blocking call
final HttpStatus httpStatus = webClient.post()
.uri(URI.create(LAMBDA_ENDPOINT))
.body(BodyInserters.fromObject(searchDTO)).exchange()
.block()
.statusCode();
if (httpStatus.is2xxSuccessful()) {
video.setIndexed(true);
}
return videoRepository.save(video);
});
}
我从一个预定的任务调用上面的内容,我并不真正关心index()方法的实际结果,只关心在这个过程中会发生什么。
@Scheduled(fixedDelay = 60000)
public void indexTask() {
indexService
.index()
.log()
.subscribe();
}
我读过很多关于这个主题的博客文章,但它们都是简单的CRUD操作,中间没有任何事情发生,所以不要给我一个如何实现这些东西的完整画面。
有什么帮助吗?