没错。这个
ReactiveMessageHandlerAdapter
期待着一个兰博达的生日
ReactiveMessageHandler
哪个合同是:
Mono<Void> handleMessage(Message<?> message);
我不确定是什么驱使你认为输入必须是
Flux
.
我猜你
someReactiveInboundChannelAdapter
是
MessageProducerSupport
确实如此
subscribeToPublisher(Publisher<? extends Message<?>> publisher)
思维方式这样做的目的是以反应式的方式从源获取数据,但仍然会将每个事件作为消息生成到下游通道。所以,这一点应该很清楚
handle()
只接收一条信息,而不是整个信息
通量
.
如果你想把流量看作通量,可以考虑使用
fluxTransform()
操作人员
另外,最好不要自己订阅,而是在配置和启动结束后在框架中订阅。
从技术上讲,你不应该考虑反应型。您只需要分别配置一个流,并只为单个项编写一个逻辑:框架为您执行反应式交互。项目反应堆是一个图书馆
通量
和
Mono
.这就是我们谈论反应型的地方。Spring集成是一个消息传递框架,其通信通过消息完成。最后,对于每一条消息,端点之间的交互是否以反应式方式完成都不重要。因此,您的处理逻辑可以不受反应类型的影响。
乌达特
如果你想完全控制
通量
从那以后
一些反应性的边界通道适配器
,然后你需要这样做:
@Bean
public Publisher<Message<Object>> reactiveFlow() {
return IntegrationFlows.from(someReactiveInboundChannelAdapter)
.toReactivePublisher();
}
然后注射
Publisher
无论何时你需要使用它。那就去吧
Flux.from(publisher)
以及任何你需要的反应式操作员,包括
subscribe()
.
这里有一些例子:
https://github.com/spring-projects/spring-integration/blob/main/spring-integration-core/src/test/java/org/springframework/integration/dsl/reactivestreams/ReactiveStreamsTests.java
这个
IntegrationFlowAdapter
无法用于此类型的配置,因为它无法接受
IntegrationFlow
由于
buildFlow()
.
这个
fluxTransform()
不过,我也能帮你做到这一点:
.fluxTransform(flux -> flux.as(Mono::just))
因此,下游流量的有效载荷将是
Flux<Message<?>>
,您可以自己处理。孩子们回来了
单声道
从那以后
fluxTransform()
将由该框架签署。那个
通量
它的价值在于你在下游流程中的责任。然后你可以用普通的
MessageHandler
:
.handle(m -> ((Flux<Message<?>>) m.getPayload())....subscribe())