我有以下用于创建两个通道的配置(通过使用JmsChannelFactoryBean):
@Bean
public JmsChannelFactoryBean jmsChannel(ActiveMQConnectionFactory activeMQConnectionFactory) {
JmsChannelFactoryBean fb = new JmsChannelFactoryBean(true);
fb.setConnectionFactory(activeMQConnectionFactory);
fb.setDestinationName("something.queue");
fb.setErrorHandler(t -> log.error("something went wrong on jms channel", t));
return fb;
}
@Bean
public JmsChannelFactoryBean jmsChannelDLQ(ActiveMQConnectionFactory activeMQConnectionFactory) {
JmsChannelFactoryBean fb = new JmsChannelFactoryBean(true);
fb.setConnectionFactory(activeMQConnectionFactory);
fb.setDestinationName("something.queue.DLQ");
fb.setErrorHandler(t -> log.error("something went wrong on jms channel", t));
return fb;
}
什么东西。队列配置为将死信放在某物上。队列DLQ。我主要使用Java DSL来配置应用程序,如果可能的话,我想保留这个。
情况是:消息从jmsChannel获取并放入sftp出站网关,如果在发送文件时出现问题,则将消息作为未送达放回jmsChannel。经过几次尝试后,它被设计成毒药,并被放在某处。队列DLQ。
-
发生这种情况时,是否有可能在错误通道上获取信息?
-
使用JMS支持的消息通道时,处理错误的最佳实践是什么?
编辑2
集成流程定义为:
IntegrationFlows.from(filesToProcessChannel).handle(outboundGateway)
其中,filesToProcessChannel是JMS支持的通道,出站网关定义为:
@Bean
public SftpOutboundGateway outboundGateway(SftpRemoteFileTemplate sftpRemoteFileTemplate) {
SftpOutboundGateway gateway = new SftpOutboundGateway(sftpRemoteFileTemplate, AbstractRemoteFileOutboundGateway.Command.PUT.getCommand(), EXPRESSION_PAYLOAD);
ArrayList<Advice> adviceChain = new ArrayList<>();
adviceChain.add(errorHandlingAdvice());
gateway.setAdviceChain(adviceChain);
return gateway;
}
我正在尝试使用建议获取异常:
@Bean
public Advice errorHandlingAdvice() {
RequestHandlerRetryAdvice advice = new RequestHandlerRetryAdvice();
RetryTemplate retryTemplate = new RetryTemplate();
SimpleRetryPolicy retryPolicy = new SimpleRetryPolicy();
retryPolicy.setMaxAttempts(1);
retryTemplate.setRetryPolicy(retryPolicy);
advice.setRetryTemplate(retryTemplate);
advice.setRecoveryCallback(new ErrorMessageSendingRecoverer(filesToProcessErrorChannel));
return advice;
}
这条路对吗?
编辑3
SFTPOutboundGateway和advices(或我:/)肯定有问题:
我使用了spring集成参考中的以下建议:
@Bean
public Advice expressionAdvice() {
ExpressionEvaluatingRequestHandlerAdvice advice = new ExpressionEvaluatingRequestHandlerAdvice();
advice.setSuccessChannelName("success.input");
advice.setOnSuccessExpressionString("payload + ' was successful'");
advice.setFailureChannelName("failure.input");
advice.setOnFailureExpressionString(
"payload + ' was bad, with reason: ' + #exception.cause.message");
advice.setTrapException(true);
return advice;
}
@Bean
public IntegrationFlow success() {
return f -> f.handle(System.out::println);
}
@Bean
public IntegrationFlow failure() {
return f -> f.handle(System.out::println);
}
当我使用:
return IntegrationFlows.from(filesToProcessChannel)
.handle((GenericHandler<File>) (payload, headers) -> {
if (payload.equals("x")) {
return null;
}
else {
throw new RuntimeException("some failure");
}
}, spec -> spec.advice(expressionAdvice()))
它会被调用,我会打印出错误消息(这是意料之中的),但当我尝试使用时:
return IntegrationFlows.from(filesToProcessChannel)
.handle(outboundGateway, spec -> spec.advice(expressionAdvice()))
不会调用通知,错误消息会返回给JMS。
该应用程序正在使用Spring Boot v2.0.0。版本,Spring v5.0.4。释放。
编辑4
我使用以下配置成功解决了建议问题,但仍然不理解处理程序规范为什么不起作用:
@Bean
IntegrationFlow files(SftpOutboundGateway outboundGateway,
...
) {
return IntegrationFlows.from(filesToProcessChannel)
.handle(outboundGateway)
...
.log(LoggingHandler.Level.INFO)
.get();
}
@Bean
public SftpOutboundGateway outboundGateway(SftpRemoteFileTemplate sftpRemoteFileTemplate) {
SftpOutboundGateway gateway = new SftpOutboundGateway(sftpRemoteFileTemplate, AbstractRemoteFileOutboundGateway.Command.PUT.getCommand(), EXPRESSION_PAYLOAD);
ArrayList<Advice> adviceChain = new ArrayList<>();
adviceChain.add(expressionAdvice());
gateway.setAdviceChain(adviceChain);
return gateway;
}
@Bean
public ExpressionEvaluatingRequestHandlerAdvice expressionAdvice() {
ExpressionEvaluatingRequestHandlerAdvice advice = new ExpressionEvaluatingRequestHandlerAdvice();
advice.setSuccessChannelName("success.input");
advice.setOnSuccessExpressionString("payload + ' was successful'");
advice.setFailureChannelName("failure.input");
advice.setOnFailureExpressionString(
"payload + ' was bad, with reason: ' + #exception.cause.message");
advice.setTrapException(true);
return advice;
}
@Bean
public IntegrationFlow success() {
return f -> f.handle(System.out::println);
}
@Bean
public IntegrationFlow failure() {
return f -> f.handle(System.out::println);
}