代码之家  ›  专栏  ›  技术社区  ›  dschulten

Spring集成Webflux异常处理

  •  3
  • dschulten  · 技术社区  · 6 年前

    如果在spring集成webflux流中发生异常,异常本身(带有stacktrace)将通过MessagePublishingErrorHandler作为有效负载发送回调用方,它使用来自“errorChannel”头的错误通道,而不是默认的错误通道。

    如何设置类似的错误处理程序 WebExceptionHandler ? 我想生成一个Http状态码,可能还有一个 DefaultErrorAttributes

    简单地定义一个从 errorChannel fluxErrorChannel ,但它似乎也没有用作错误通道,错误不会在我的 errorFlow :

    @Bean
    public IntegrationFlow fooRestFlow() {
        return IntegrationFlows.from(
                WebFlux.inboundGateway("/foo")
                        .requestMapping(r -> r.methods(HttpMethod.POST))
                        .requestPayloadType(Map.class)
                        .errorChannel(fluxErrorChannel()))
                .channel(bazFlow().getInputChannel())
                .get();
    }
    
    @Bean
    public MessageChannel fluxErrorChannel() {
        return MessageChannels.flux().get();
    }
    
    @Bean
    public IntegrationFlow errorFlow() {
        return IntegrationFlows.from(fluxErrorChannel())
                .transform(source -> source)
                .enrichHeaders(h -> h.header(HttpHeaders.STATUS_CODE, HttpStatus.BAD_GATEWAY))
                .get();
    }
    
    @Bean
    public IntegrationFlow bazFlow() {
        return f -> f.split(Map.class, map -> map.get("items"))
                .channel(MessageChannels.flux())
                .<String>handle((p, h) -> throw new RuntimeException())
                .aggregate();
    }
    

    更新

    MessagingGatewaySupport.doSendAndReceiveMessageReactive 在WebFlux.inboundGateway上定义的错误通道从不用于设置错误通道,而错误通道始终是 replyChannel 正在创建的 here :

    FutureReplyChannel replyChannel = new FutureReplyChannel();
    
    Message<?> requestMessage = MutableMessageBuilder.fromMessage(message)
        .setReplyChannel(replyChannel)
        .setHeader(this.messagingTemplate.getSendTimeoutHeader(), null)
        .setHeader(this.messagingTemplate.getReceiveTimeoutHeader(), null)
        .setErrorChannel(replyChannel)
        .build();
    

    originalErrorChannelHandler 在里面 Mono.fromFuture ,但在我的例子中,错误通道是空的。此外,还有 onErrorResume

    return Mono.fromFuture(replyChannel.messageFuture)
        .doOnSubscribe(s -> {
            if (!error && this.countsEnabled) {
                this.messageCount.incrementAndGet();
            }
        })
        .<Message<?>>map(replyMessage ->
            MessageBuilder.fromMessage(replyMessage)
                .setHeader(MessageHeaders.REPLY_CHANNEL, originalReplyChannelHeader)
                .setHeader(MessageHeaders.ERROR_CHANNEL, originalErrorChannelHeader)
                .build())
        .onErrorResume(t -> error ? Mono.error(t) : handleSendError(requestMessage, t));
    

    这是如何运作的?

    1 回复  |  直到 6 年前
        1
  •  1
  •   Gary Russell    6 年前

    它是一只虫子;这个 ErrorMessage 由错误处理程序为异常创建的 errorChannel 标题(必须是 replyChannel 所以网关得到了结果)。然后,网关应该调用错误流(如果存在)并返回该错误流的结果。

    https://jira.spring.io/browse/INT-4541