代码之家  ›  专栏  ›  技术社区  ›  dschulten

spring集成:向选择性消费者发送排队消息

  •  2
  • dschulten  · 技术社区  · 6 年前

    我有一个spring集成流,它生成的消息应该一直保存,等待合适的消费者出现并使用它们。

    @Bean
    public IntegrationFlow messagesPerCustomerFlow() {
        return IntegrationFlows.
                from(WebFlux.inboundChannelAdapter("/messages/{customer}")
                        .requestMapping(r -> r
                                .methods(HttpMethod.POST)
                        )
                        .requestPayloadType(JsonNode.class)
                        .headerExpression("customer", "#pathVariables.customer")
                )
                .channel(messagesPerCustomerQueue()) 
                .get();
    }
    
    @Bean(name = PollerMetadata.DEFAULT_POLLER)
    public PollerSpec poller() {
        return Pollers.fixedRate(100);
    }
    
    @Bean
    public QueueChannel messagesPerCustomerQueue() {
        return MessageChannels.queue()
                .get();
    }
    

    队列中的消息应通过http作为服务器发送的事件传递,如下所示。

    PublisherSubscription只是发布者和IntegrationFlowRegistration的持有者,后者用于在不再需要时销毁动态创建的流(请注意,GET的传入消息没有内容,Webflux集成没有正确处理这些内容,因此需要一个小的解决方法来访问推送到ATM的path变量) customer 标题):

    @Bean
    public IntegrationFlow eventMessagesPerCustomer() {
        return IntegrationFlows
           .from(WebFlux.inboundGateway("/events/{customer}")
                .requestMapping(m -> m.produces(TEXT_EVENT_STREAM_VALUE))
                .headerExpression("customer", "#pathVariables.customer")
                .payloadExpression("''") // neeeded to make handle((p,h) work
           )
           .log()
           .handle((p, h) -> {
               String customer = h.get("customer").toString();
               PublisherSubscription<JsonNode> publisherSubscription =
                   subscribeToMessagesPerCustomer(customer);
               return Flux.from(publisherSubscription.getPublisher())
                       .map(Message::getPayload)
                       .doFinally(signalType ->
                          publisherSubscription.unsubscribe());
           })
           .get();
    }
    

    上面对服务器发送事件的请求动态注册了一个流,该流根据需要向队列通道订阅 selective consumer 通过一个带有 throwExceptionOnRejection(true) .遵循 Message Handler chain 这将确保信息被提供给所有消费者,直到人们接受它。

    public PublisherSubscription<JsonNode> subscribeToMessagesPerCustomer(String customer) {
        IntegrationFlowBuilder flow = IntegrationFlows.from(messagesPerCustomerQueue())
                .filter("headers.customer=='" + customer + "'",
                        filterEndpointSpec -> filterEndpointSpec.throwExceptionOnRejection(true));
        Publisher<Message<JsonNode>> messagePublisher = flow.toReactivePublisher();
    
        IntegrationFlowRegistration registration = integrationFlowContext.registration(flow.get())
                .register();
    
        return new PublisherSubscription<>(messagePublisher, registration);
    }
    

    这种构造原则上是可行的,但存在以下问题:

    • 在完全没有订阅者的情况下发送到队列的消息会导致 MessageDeliveryException: Dispatcher has no subscribers for channel 'application.messagesPerCustomerQueue'
    • 当没有匹配的订户时发送到队列的消息会导致 AggregateMessageDeliveryException: All attempts to deliver Message to MessageHandlers failed .

    我想要的是消息保留在队列中,并反复提供给所有订阅者,直到消息被消费或过期(一个适当的选择性消费者)。我该怎么做?

    1 回复  |  直到 6 年前
        1
  •  1
  •   Artem Bilan    6 年前

    请注意,GET的传入消息没有内容,Webflux集成没有正确处理这些内容

    我不理解这种担忧。

    这个 WebFluxInboundEndpoint 使用以下算法:

    if (isReadable(request)) {
       ...
    else {
        return (Mono<T>) Mono.just(exchange.getRequest().getQueryParams());
    }
    

    哪里 GET 方法真的很有用 else 树枝还有 payload 要发送的消息的一部分是 MultiValueMap .此外,我们最近还与您一起解决了 POST ,该版本也已发布 5.0.5 : https://jira.spring.io/browse/INT-4462

    Dispatcher没有订户

    不可能在路上发生 QueueChannel 原则上那里根本没有调度员。它只是队列和发送者 提供 要存储的消息。你错过了与我们分享的其他东西。但让我们用它自己的名字来称呼事物:the messagesPerCustomerQueue 这不是一个 排队通道 在你的申请中。

    使现代化

    关于:

    我想要的是消息保留在队列中,并反复提供给所有订阅者,直到消息被消费或过期(一个适当的选择性消费者)

    只有我们看到的是 PollableJmsChannel 基于嵌入式ActiveMQ,以尊重消息的TTL。作为该队列的消费者,您应该拥有 PublishSubscribeChannel setMinSubscribers(1) 使 MessagingTemplate MessageDeliveryException 当还没有订户的时候。通过这种方式,JMS事务将被回滚,消息将返回到队列以进行下一个轮询周期。

    内存中的问题 排队通道 没有事务性的重新传递,一旦从该队列中轮询消息,消息就会丢失。

    另一个类似于JMS(事务)的选项是 JdbcChannelMessageStore 对于 排队通道 .虽然这样我们没有TTL功能。。。