代码之家  ›  专栏  ›  技术社区  ›  Błażej Karwowski

如何处理JmsChannelFactoryBean错误,是否有可能使用自定义错误通道?

  •  1
  • Błażej Karwowski  · 技术社区  · 6 年前

    我有以下用于创建两个通道的配置(通过使用JmsChannelFactoryBean):

    @Bean
    public JmsChannelFactoryBean jmsChannel(ActiveMQConnectionFactory activeMQConnectionFactory) {
        JmsChannelFactoryBean fb = new JmsChannelFactoryBean(true);
        fb.setConnectionFactory(activeMQConnectionFactory);
        fb.setDestinationName("something.queue");
        fb.setErrorHandler(t -> log.error("something went wrong on jms channel", t));
        return fb;
    }
    
    @Bean
    public JmsChannelFactoryBean jmsChannelDLQ(ActiveMQConnectionFactory activeMQConnectionFactory) {
        JmsChannelFactoryBean fb = new JmsChannelFactoryBean(true);
        fb.setConnectionFactory(activeMQConnectionFactory);
        fb.setDestinationName("something.queue.DLQ");
        fb.setErrorHandler(t -> log.error("something went wrong on jms channel", t));
        return fb;
    }
    

    什么东西。队列配置为将死信放在某物上。队列DLQ。我主要使用Java DSL来配置应用程序,如果可能的话,我想保留这个。

    情况是:消息从jmsChannel获取并放入sftp出站网关,如果在发送文件时出现问题,则将消息作为未送达放回jmsChannel。经过几次尝试后,它被设计成毒药,并被放在某处。队列DLQ。

    1. 发生这种情况时,是否有可能在错误通道上获取信息?
    2. 使用JMS支持的消息通道时,处理错误的最佳实践是什么?

    编辑2

    集成流程定义为:

    IntegrationFlows.from(filesToProcessChannel).handle(outboundGateway)
    

    其中,filesToProcessChannel是JMS支持的通道,出站网关定义为:

    @Bean
    public SftpOutboundGateway outboundGateway(SftpRemoteFileTemplate sftpRemoteFileTemplate) {
        SftpOutboundGateway gateway = new SftpOutboundGateway(sftpRemoteFileTemplate, AbstractRemoteFileOutboundGateway.Command.PUT.getCommand(), EXPRESSION_PAYLOAD);
        ArrayList<Advice> adviceChain = new ArrayList<>();
        adviceChain.add(errorHandlingAdvice());
        gateway.setAdviceChain(adviceChain);
        return gateway;
    }
    

    我正在尝试使用建议获取异常:

    @Bean
    public Advice errorHandlingAdvice() {
        RequestHandlerRetryAdvice advice = new RequestHandlerRetryAdvice();
        RetryTemplate retryTemplate = new RetryTemplate();
        SimpleRetryPolicy retryPolicy = new SimpleRetryPolicy();
        retryPolicy.setMaxAttempts(1);
        retryTemplate.setRetryPolicy(retryPolicy);
        advice.setRetryTemplate(retryTemplate);
        advice.setRecoveryCallback(new ErrorMessageSendingRecoverer(filesToProcessErrorChannel));
        return advice;
    }
    

    这条路对吗?

    编辑3

    SFTPOutboundGateway和advices(或我:/)肯定有问题: 我使用了spring集成参考中的以下建议:

    @Bean
    public Advice expressionAdvice() {
        ExpressionEvaluatingRequestHandlerAdvice advice = new ExpressionEvaluatingRequestHandlerAdvice();
        advice.setSuccessChannelName("success.input");
        advice.setOnSuccessExpressionString("payload + ' was successful'");
        advice.setFailureChannelName("failure.input");
        advice.setOnFailureExpressionString(
                "payload + ' was bad, with reason: ' + #exception.cause.message");
        advice.setTrapException(true);
        return advice;
    }
    
    @Bean
    public IntegrationFlow success() {
        return f -> f.handle(System.out::println);
    }
    
    @Bean
    public IntegrationFlow failure() {
        return f -> f.handle(System.out::println);
    }
    

    当我使用:

    return IntegrationFlows.from(filesToProcessChannel)
                .handle((GenericHandler<File>) (payload, headers) -> {
                    if (payload.equals("x")) {
                        return null;
                    }
                    else {
                        throw new RuntimeException("some failure");
                    }
                }, spec -> spec.advice(expressionAdvice()))
    

    它会被调用,我会打印出错误消息(这是意料之中的),但当我尝试使用时:

    return IntegrationFlows.from(filesToProcessChannel)
                .handle(outboundGateway, spec -> spec.advice(expressionAdvice()))
    

    不会调用通知,错误消息会返回给JMS。

    该应用程序正在使用Spring Boot v2.0.0。版本,Spring v5.0.4。释放。

    编辑4

    我使用以下配置成功解决了建议问题,但仍然不理解处理程序规范为什么不起作用:

    @Bean
    IntegrationFlow files(SftpOutboundGateway outboundGateway,
                          ...
    ) {
        return IntegrationFlows.from(filesToProcessChannel)
                .handle(outboundGateway)
                ...
                .log(LoggingHandler.Level.INFO)
                .get();
    }
    
    @Bean
    public SftpOutboundGateway outboundGateway(SftpRemoteFileTemplate sftpRemoteFileTemplate) {
        SftpOutboundGateway gateway = new SftpOutboundGateway(sftpRemoteFileTemplate, AbstractRemoteFileOutboundGateway.Command.PUT.getCommand(), EXPRESSION_PAYLOAD);
        ArrayList<Advice> adviceChain = new ArrayList<>();
        adviceChain.add(expressionAdvice());
        gateway.setAdviceChain(adviceChain);
        return gateway;
    }
    
    
    @Bean
    public ExpressionEvaluatingRequestHandlerAdvice expressionAdvice() {
        ExpressionEvaluatingRequestHandlerAdvice advice = new ExpressionEvaluatingRequestHandlerAdvice();
        advice.setSuccessChannelName("success.input");
        advice.setOnSuccessExpressionString("payload + ' was successful'");
        advice.setFailureChannelName("failure.input");
        advice.setOnFailureExpressionString(
                "payload + ' was bad, with reason: ' + #exception.cause.message");
        advice.setTrapException(true);
        return advice;
    }
    
    @Bean
    public IntegrationFlow success() {
        return f -> f.handle(System.out::println);
    }
    
    @Bean
    public IntegrationFlow failure() {
        return f -> f.handle(System.out::println);
    }
    
    1 回复  |  直到 4 年前
        1
  •  1
  •   Gary Russell    6 年前

    由于到DLQ的移动是由代理执行的,因此应用程序没有记录这种情况的机制——它甚至不知道发生了什么。

    在多次尝试之后,您必须自己捕获异常并将消息发布到DLQ中( JMSXDeliveryCount 标头),而不是使用代理策略。

    编辑

    添加 Advice .handle()

    .handle(outboundGateway, e -> e.advice(myAdvice))
    

    哪里 myAdvice 机具 MethodInterceptor

    invoke 方法,在发生故障后,您可以检查传递计数标头,如果它超过阈值,则将消息发布到DLQ(例如,将其发送到另一个订阅了JMS出站适配器的通道),并记录错误;如果未超过阈值,只需返回 invocation.proceed() (或重新显示异常)。

    这样,您就可以控制向DLQ的发布,而不是由代理来完成。您还可以向标头添加更多信息,例如异常。

    编辑2

    你需要这样的东西

    public class MyAdvice implements MethodInterceptor {
    
        @Autowired
        private MessageChannel toJms;
    
        public Object invoke(MethodInvocation invocation) throws Throwable {
            try {
                return invocation.proceed();
            }
            catch Exception(e) {
                Message<?> message = (Message<?>) invocation.getArguments()[0];
                Integer redeliveries = messasge.getHeader("JMXRedeliveryCount", Integer.class);
                if (redeliveries != null && redeliveries > 3) {
                    this.toJms.send(message); // maybe rebuild with additional headers about the error
                }
                else {
                    throw e;
                }
            }
        }
    }
    

    (应该很接近,但我还没有测试过)。它假定您的代理填充该标头。