您可以使用
Flux
/
Mono
操作员:
// get the URIs from somewhere
Flux<URI> uris = //...
Flux<MyAnyEntity> entities = uris
// map each URI to a HTTP client call and do nothing with the response
.flatMap(uri -> webClient.get().uri(uri).exchange().then())
// chain that call with a call on your repository
.thenMany(repo.findAll());
更新:
此代码自然是异步的,无阻塞的,因此
flatMap
操作员将根据消费者传达的需求同时执行(这就是我们所说的背压)。
如果反应流
Subscriber
request(N)
元素,然后
N
请求可能会同时执行。我不认为这不是你想直接处理的事情,尽管你可以使用窗口操作符来影响微操作。
使用
.subscribeOn(Schedulers.parallel())
在这种情况下不会提高并发性-
as stated in the reference documentation
您应该只将其用于CPU受限的工作。