代码之家  ›  专栏  ›  技术社区  ›  dschulten

spring集成:如何像SSE一样交付延迟的细节

  •  2
  • dschulten  · 技术社区  · 6 年前

    我有一个项目清单,我想尽快检索和返回。

    对于我还需要检索详细信息的每个项目,它们可能会在几秒钟后返回。

    当然,我可以用HTTP网关创建两个不同的路由,首先请求列表,然后请求细节。然而,我必须等到所有的细节都来了。我想马上把清单寄回去,一拿到详细资料就马上寄回去。

    更新

    按照Artem Bilan的建议,我的流返回一个通量作为有效负载,它将项目列表合并为Mono,并将处理过的项目合并为通量。

    请注意,下面的示例通过调用 toUpperCase ;我的实际用例需要路由和传出呼叫来获取每个项目的详细信息:

        @Bean
    public IntegrationFlow sseFlow() {
        return IntegrationFlows
                .from(WebFlux.inboundGateway("/strings/sse")
                        .requestMapping(m -> m.produces(MediaType.TEXT_EVENT_STREAM_VALUE))
                        .mappedResponseHeaders("*"))
                .enrichHeaders(Collections.singletonMap("aHeader", new String[]{"foo", "bar"}))
                .transform("headers.aHeader")
                .<String[]>handle((p, h) -> {
                    return Flux.merge(
                            Mono.just(p),
                            Flux.fromArray(p)
                                    .map(t -> {
                                        return t.toUpperCase();
                                        // return detailsResolver.resolveDetail(t);
                                    }));
                })
                .get();
    }
    

    更接近我的目标。当我使用curl从这个流中请求数据时,我会立即得到项目列表,稍后再得到处理过的项目:

    λ curl http://localhost:8080/strings/sse
    data:["foo","bar"]
    
    data:FOO
    
    data:BAR
    

    虽然简单地将字符串转换为大写可以很好地工作,但我很难使用 WebFlux.outboundGateway . 这个 detailsResolver 在上述注释中,代码定义如下:

    @MessagingGateway
    public interface DetailsResolver {
    
        @Gateway(requestChannel = "itemDetailsFlow.input")
        Object resolveDetail(String item);
    
    }
    
    @Bean
    IntegrationFlow itemDetailsFlow() {
        return f -> f.handle(WebFlux.<String>outboundGateway(m ->
                UriComponentsBuilder.fromUriString("http://localhost:3003/rest/path/")
                        .path(m.getPayload())
                        .build()
                        .toUri())
                .httpMethod(HttpMethod.GET)
                .expectedResponseType(JsonNode.class)
                .replyPayloadToFlux(false));
    }
    

    当我在报纸上发表评论时 细节求解器 电话和评论 t.toUpperCase ,的 outboundGateway 似乎设置正确(日志显示订阅者存在,请求已发出信号),但从未得到响应(未到达中的断点) ExchangeFunctions.exchange#91 ).

    我保证 DetailsResolver 它本身是通过从上下文中获取它作为bean并调用它的方法来工作的——这给了我一个JsonNode响应。

    原因是什么?

    1 回复  |  直到 6 年前
        1
  •  2
  •   Artem Bilan    6 年前

    是的,我不会用 toReactivePublsiher() 因为您有当前请求的上下文。你需要每个请求的流量。我会用这样的东西 Flux.merge(Publisher<? extends I>... sources) ,第一个 Flux 第二个是每个项目的详细信息(比如 Tuple2 ).

    为此,您确实可以使用以下内容:

      IntegrationFlows
                    .from(WebFlux.inboundGateway("/sse")
                            .requestMapping(m -> m.produces(MediaType.TEXT_EVENT_STREAM_VALUE)))
    

    下游的水流会产生 通量 作为应答的有效载荷。

    我在测试用例中有这样一个示例:

        @Bean
        public IntegrationFlow sseFlow() {
            return IntegrationFlows
                    .from(WebFlux.inboundGateway("/sse")
                            .requestMapping(m -> m.produces(MediaType.TEXT_EVENT_STREAM_VALUE))
                            .mappedResponseHeaders("*"))
                    .enrichHeaders(Collections.singletonMap("aHeader", new String[] { "foo", "bar", "baz" }))
                    .handle((p, h) -> Flux.fromArray((String[]) h.get("aHeader")))
                    .get();
        }