我有一个项目清单,我想尽快检索和返回。
对于我还需要检索详细信息的每个项目,它们可能会在几秒钟后返回。
当然,我可以用HTTP网关创建两个不同的路由,首先请求列表,然后请求细节。然而,我必须等到所有的细节都来了。我想马上把清单寄回去,一拿到详细资料就马上寄回去。
更新
按照Artem Bilan的建议,我的流返回一个通量作为有效负载,它将项目列表合并为Mono,并将处理过的项目合并为通量。
请注意,下面的示例通过调用
toUpperCase
;我的实际用例需要路由和传出呼叫来获取每个项目的详细信息:
@Bean
public IntegrationFlow sseFlow() {
return IntegrationFlows
.from(WebFlux.inboundGateway("/strings/sse")
.requestMapping(m -> m.produces(MediaType.TEXT_EVENT_STREAM_VALUE))
.mappedResponseHeaders("*"))
.enrichHeaders(Collections.singletonMap("aHeader", new String[]{"foo", "bar"}))
.transform("headers.aHeader")
.<String[]>handle((p, h) -> {
return Flux.merge(
Mono.just(p),
Flux.fromArray(p)
.map(t -> {
return t.toUpperCase();
// return detailsResolver.resolveDetail(t);
}));
})
.get();
}
更接近我的目标。当我使用curl从这个流中请求数据时,我会立即得到项目列表,稍后再得到处理过的项目:
λ curl http://localhost:8080/strings/sse
data:["foo","bar"]
data:FOO
data:BAR
虽然简单地将字符串转换为大写可以很好地工作,但我很难使用
WebFlux.outboundGateway
. 这个
detailsResolver
在上述注释中,代码定义如下:
@MessagingGateway
public interface DetailsResolver {
@Gateway(requestChannel = "itemDetailsFlow.input")
Object resolveDetail(String item);
}
@Bean
IntegrationFlow itemDetailsFlow() {
return f -> f.handle(WebFlux.<String>outboundGateway(m ->
UriComponentsBuilder.fromUriString("http://localhost:3003/rest/path/")
.path(m.getPayload())
.build()
.toUri())
.httpMethod(HttpMethod.GET)
.expectedResponseType(JsonNode.class)
.replyPayloadToFlux(false));
}
当我在报纸上发表评论时
细节求解器
电话和评论
t.toUpperCase
,的
outboundGateway
似乎设置正确(日志显示订阅者存在,请求已发出信号),但从未得到响应(未到达中的断点)
ExchangeFunctions.exchange#91
).
我保证
DetailsResolver
它本身是通过从上下文中获取它作为bean并调用它的方法来工作的——这给了我一个JsonNode响应。
原因是什么?