代码之家  ›  专栏  ›  技术社区  ›  dschulten

spring集成:如何从WebFlux集成流创建spring反应堆通量?

  •  3
  • dschulten  · 技术社区  · 6 年前

    在里面 How to create a Spring Reactor Flux from Http integration flow? 阿尔特姆·比兰 mentioned in a comment 将来有可能使用webflux集成。

    自撰写该评论以来,WebFlux集成 factored out to spring-integration-webflux 。我尝试了以下方法来复制基于MVC的http->通过替换 Http.inboundChannelAdapter 以及 @GetRequest 具有 WebFlux.inboundChannelAdapter WebFlux.inboundGateway :

    @SpringBootApplication
    public class WebfluxApplication {
    
      public static void main(String[] args) {
        SpringApplication.run(WebfluxApplication.class, args);
      }
    
    
      @Bean
      public Publisher<Message<String>> reactiveSource() {
        return IntegrationFlows.
                from(WebFlux.inboundChannelAdapter("/message/{id}")
                        .requestMapping(r -> r
                                .methods(HttpMethod.POST)
                        )
                        .payloadExpression("#pathVariables.id")
                )
                .log()
                .channel(MessageChannels.flux())
                .toReactivePublisher();
      }
    
    
      @Bean
      public IntegrationFlow eventMessages() {
        return IntegrationFlows
                .from(WebFlux.inboundGateway("/events")
                        .requestMapping(m -> m.produces(MediaType.TEXT_EVENT_STREAM_VALUE)))
                .handle((p, h) -> reactiveSource())                
                .get();
    }
    

    }

    看起来 reactiveSource() publisher没有收到任何消息,至少我的 .log() 陈述

    当我更换 反应源() 中的发布者 eventMessages

    .handle((p, h) -> reactiveSource()) 
    

    假出版商

    .handle((p, h) -> Flux.just("foo", "bar"))
    

    我收到SSE的回复

    curl localhost:8080/events
    

    跟踪日志显示 反应源() 映射后处理程序,并且 WebFluxInboundEndpoint.handle 正在调用方法:

    2018-05-05 16:50:58.788  INFO 6552 --- [           main] xIntegrationRequestMappingHandlerMapping : Mapped "{[/message/{id}],methods=[POST]}" onto public abstract reactor.core.publisher.Mono<java.lang.Void> org.springframework.web.server.WebHandler.handle(org.springframework.web.server.ServerWebExchange)
    2018-05-05 16:50:58.789  INFO 6552 --- [           main] xIntegrationRequestMappingHandlerMapping : Mapped "{[/events],methods=[GET || POST],produces=[text/event-stream]}" onto public abstract reactor.core.publisher.Mono<java.lang.Void> org.springframework.web.server.WebHandler.handle(org.springframework.web.server.ServerWebExchange)
    2018-05-05 16:50:59.191  INFO 6552 --- [ctor-http-nio-1] r.ipc.netty.tcp.BlockingNettyContext     : Started HttpServer on /0:0:0:0:0:0:0:0:8080
    2018-05-05 16:50:59.192  INFO 6552 --- [           main] o.s.b.web.embedded.netty.NettyWebServer  : Netty started on port(s): 8080
    2018-05-05 16:50:59.196  INFO 6552 --- [           main] d.e.sample.webflux.WebfluxApplication    : Started WebfluxApplication in 2.608 seconds (JVM running for 3.419)
    2018-05-05 16:51:06.918 DEBUG 6552 --- [ctor-http-nio-2] o.s.web.reactive.DispatcherHandler       : Processing POST request for [http://localhost:8080/message/4]
    2018-05-05 16:51:06.932 DEBUG 6552 --- [ctor-http-nio-2] s.w.r.r.m.a.RequestMappingHandlerMapping : Looking up handler method for path /message/4
    2018-05-05 16:51:06.933 DEBUG 6552 --- [ctor-http-nio-2] s.w.r.r.m.a.RequestMappingHandlerMapping : Did not find handler method for [/message/4]
    2018-05-05 16:51:06.967 TRACE 6552 --- [ctor-http-nio-2] o.s.w.r.r.method.InvocableHandlerMethod  : Invoking 'org.springframework.integration.webflux.inbound.WebFluxInboundEndpoint.handle' with arguments [org.springframework.web.server.adapter.DefaultServerWebExchange@775cdb20]
    2018-05-05 16:51:06.967 TRACE 6552 --- [ctor-http-nio-2] o.s.w.r.r.method.InvocableHandlerMethod  : Method [org.springframework.integration.webflux.inbound.WebFluxInboundEndpoint.handle] returned [MonoDefer]
    2018-05-05 16:51:06.967 DEBUG 6552 --- [ctor-http-nio-2] o.s.w.r.r.method.InvocableHandlerMethod  : Response fully handled in controller method
    2018-05-05 16:51:11.363 DEBUG 6552 --- [ctor-http-nio-3] o.s.web.reactive.DispatcherHandler       : Processing POST request for [http://localhost:8080/message/4]
    2018-05-05 16:51:11.363 DEBUG 6552 --- [ctor-http-nio-3] s.w.r.r.m.a.RequestMappingHandlerMapping : Looking up handler method for path /message/4
    2018-05-05 16:51:11.363 DEBUG 6552 --- [ctor-http-nio-3] s.w.r.r.m.a.RequestMappingHandlerMapping : Did not find handler method for [/message/4]
    2018-05-05 16:51:11.364 TRACE 6552 --- [ctor-http-nio-3] o.s.w.r.r.method.InvocableHandlerMethod  : Invoking 'org.springframework.integration.webflux.inbound.WebFluxInboundEndpoint.handle' with arguments [org.springframework.web.server.adapter.DefaultServerWebExchange@71f648a3]
    2018-05-05 16:51:11.364 TRACE 6552 --- [ctor-http-nio-3] o.s.w.r.r.method.InvocableHandlerMethod  : Method [org.springframework.integration.webflux.inbound.WebFluxInboundEndpoint.handle] returned [MonoDefer]
    2018-05-05 16:51:11.364 DEBUG 6552 --- [ctor-http-nio-3] o.s.w.r.r.method.InvocableHandlerMethod  : Response fully handled in controller method
    

    为什么会这样?

    1 回复  |  直到 6 年前
        1
  •  2
  •   dschulten    6 年前

    原因似乎是WebFluxInboundEndpoint在没有正文的情况下停止处理POST请求 doHandle() ,该行

    .map(body -> new HttpEntity<>(...)) 
    

    如果没有请求正文内容,则从不执行:

    private Mono<Void> doHandle(ServerWebExchange exchange) {
        return extractRequestBody(exchange)
                .doOnSubscribe(s -> this.activeCount.incrementAndGet())
                .map(body -> new HttpEntity<>(body, exchange.getRequest().getHeaders()))
                .map(entity -> buildMessage(entity, exchange))
                .flatMap(requestMessage -> {
                    if (this.expectReply) {
                        return sendAndReceiveMessageReactive(requestMessage)
                                .flatMap(replyMessage -> populateResponse(exchange, replyMessage));
                    }
                    else {
                        send(requestMessage);
                        return setStatusCode(exchange);
                    }
                })
                .doOnTerminate(this.activeCount::decrementAndGet);
    
    }
    

    解决方法:调用者必须发送任何非空的请求正文才能使其工作,例如,使用-d传递一个报价就足够了:

    curl -d ' http://localhost:8080/message/4
    

    有了这样一个请求,我的日志将按预期包含传入的GenericMessage,/events资源开始生成SSE。

    2018-05-05 17:25:24.777 TRACE 40436 --- [ctor-http-nio-8] o.s.w.r.r.method.InvocableHandlerMethod  : Method [org.springframework.integration.webflux.inbound.WebFluxInboundEndpoint.handle] returned [MonoDefer]
    2018-05-05 17:25:24.777 DEBUG 40436 --- [ctor-http-nio-8] o.s.w.r.r.method.InvocableHandlerMethod  : Response fully handled in controller method
    2018-05-05 17:25:24.778  INFO 40436 --- [ctor-http-nio-8] o.s.integration.handler.LoggingHandler   : GenericMessage [payload=4, headers={http_requestMethod=POST, Accept=*/*, User-Agent=curl/7.49.1, http_requestUrl=http://localhost:8080/message/4, Host=localhost:8080, id=9a09294d-280a-af3b-0894-23597cf1cb5f, Content-Length=1, contentType=application/x-www-form-urlencoded, timestamp=1525533924778}]