代码之家  ›  专栏  ›  技术社区  ›  ropes-nopes

如何在反应式Spring集成中处理反应式类型?

  •  0
  • ropes-nopes  · 技术社区  · 2 年前

    我正在玩一点反应式Spring集成,并尝试执行以下基本操作:

        @Override
        protected IntegrationFlowDefinition<?> buildFlow() {
            return from(someReactiveInboundChannelAdapter)
                    .handle(new ReactiveMessageHandlerAdapter(flux -> flux.subscribe(System.out::println)));
        }
    

    但这不起作用,因为框架不会认识到这一点 flux 是一个 Flux<?> 例子该框架将其视为 Message<?> 我不知道该从哪里开始写反应堆代码。

    0 回复  |  直到 2 年前
        1
  •  1
  •   Artem Bilan    2 年前

    没错。这个 ReactiveMessageHandlerAdapter 期待着一个兰博达的生日 ReactiveMessageHandler 哪个合同是:

    Mono<Void> handleMessage(Message<?> message);
    

    我不确定是什么驱使你认为输入必须是 Flux .

    我猜你 someReactiveInboundChannelAdapter MessageProducerSupport 确实如此 subscribeToPublisher(Publisher<? extends Message<?>> publisher) 思维方式这样做的目的是以反应式的方式从源获取数据,但仍然会将每个事件作为消息生成到下游通道。所以,这一点应该很清楚 handle() 只接收一条信息,而不是整个信息 通量 .

    如果你想把流量看作通量,可以考虑使用 fluxTransform() 操作人员

    另外,最好不要自己订阅,而是在配置和启动结束后在框架中订阅。

    从技术上讲,你不应该考虑反应型。您只需要分别配置一个流,并只为单个项编写一个逻辑:框架为您执行反应式交互。项目反应堆是一个图书馆 通量 Mono .这就是我们谈论反应型的地方。Spring集成是一个消息传递框架,其通信通过消息完成。最后,对于每一条消息,端点之间的交互是否以反应式方式完成都不重要。因此,您的处理逻辑可以不受反应类型的影响。

    乌达特

    如果你想完全控制 通量 从那以后 一些反应性的边界通道适配器 ,然后你需要这样做:

        @Bean
        public Publisher<Message<Object>> reactiveFlow() {
              return IntegrationFlows.from(someReactiveInboundChannelAdapter)
              .toReactivePublisher();
        }
    

    然后注射 Publisher 无论何时你需要使用它。那就去吧 Flux.from(publisher) 以及任何你需要的反应式操作员,包括 subscribe() .

    这里有一些例子: https://github.com/spring-projects/spring-integration/blob/main/spring-integration-core/src/test/java/org/springframework/integration/dsl/reactivestreams/ReactiveStreamsTests.java

    这个 IntegrationFlowAdapter 无法用于此类型的配置,因为它无法接受 IntegrationFlow 由于 buildFlow() .

    这个 fluxTransform() 不过,我也能帮你做到这一点:

    .fluxTransform(flux -> flux.as(Mono::just))
    

    因此,下游流量的有效载荷将是 Flux<Message<?>> ,您可以自己处理。孩子们回来了 单声道 从那以后 fluxTransform() 将由该框架签署。那个 通量 它的价值在于你在下游流程中的责任。然后你可以用普通的 MessageHandler :

    .handle(m -> ((Flux<Message<?>>) m.getPayload())....subscribe())