代码之家  ›  专栏  ›  技术社区  ›  ioreskovic

弹簧集成与反应式WebSockets

  •  1
  • ioreskovic  · 技术社区  · 6 年前

    springintegration提供了非反应性的入站/出站WebSocket适配器,简单地说,它通过内部容器将会话与id关联起来,您对消息进行一些处理,在出站时,它检查消息头中的会话id,并通过该会话发送。

    现在,Spring通过org.springframework.web网站.被动.socket.WebSocketSession和其他类一样,我想知道是否在被动WebSocket堆栈的通道适配器方面也有类似的支持。

    如果没有,是否有任何共同的模式/实践,如何将反应式WS与spring集成消息流集成?

    2 回复  |  直到 6 年前
        1
  •  1
  •   Artem Bilan    6 年前

    这个功能还没有被调用,所以我们还没有考虑这个问题。

    拜托,看看我的眼睛 SandBox . 就目前的情况而言,这是我能提出的最好建议。

    WebSocketHandler 使用适当的URL映射实现。实施只是向前推进了一步 Flux session.receive() IntegrationFlow Publisher 用于 session.send()

    我相信还有很多其他方法可以使用,例如使用 FluxMessageChannel 大豆及其制品 subscribeTo() 从这个 handle(WebSocketSession) impl桥接 通量 @MessagingGateway 电话从 doOnNext() .

    但不确定,如果 WebSocketSession MessageHeaders 在集成流中为其提供访问权限。

        2
  •  0
  •   ioreskovic    6 年前

    谢谢你的见解,阿泰姆,他们帮了大忙。

    我最后做的是:

    从handler,我只需将接收到的消息发送到我的频道,接受它们:

    public class FakeWebSocketHandler implements WebSocketHandler {
        @Override
        public Mono<Void> handle(WebSocketSession session) {
            session.receive()
                .map(wsm -> MessageBuilder.withPayload(wsm).build())
                .subscribe(subscriberChannel::send);
    
            return Mono.never();
        }
    }
    

    在我的流程结束时,在我的最终服务激活器中,我向下游发送响应:

    public class FakeResponder extends AbstractMessageHandler {
        @Override
        protected void handleMessageInternal(Message<?> message) {
            final WebSocketSession session = ...; // obtain WebSocketSession for this message
    
            session.send(Mono.just(message.getPayload())
                .map(this::convertToSomeByteRepresentation)
                .map(bb -> session.binaryMessage(dbf -> dbf.wrap(bb)))
            ).subscribe();
        }
    }