我正试图为我的SpringCloud数据流创建一个定制的异常处理程序,以路由一些需要重新排队的错误和其他需要dlq'd的错误。
为此,我使用了全局Spring集成“ErrorChannel”和基于异常类型的路由。
这是Spring集成错误路由器的代码:
package com.acme.error.router;
import com.acme.exceptions.DlqException;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.integration.annotation.MessageEndpoint;
import org.springframework.integration.annotation.Router;
import org.springframework.integration.transformer.MessageTransformationException;
import org.springframework.messaging.Message;
@MessageEndpoint
@EnableBinding({ ErrorMessageChannels.class })
public class ErrorMessageMappingRouter {
private static final Logger LOGGER = LoggerFactory.getLogger(ErrorMessageMappingRouter.class);
public static final String ERROR_CHANNEL = "errorChannel";
@Router(inputChannel = ERROR_CHANNEL)
public String onError(Message<Object> message) {
LOGGER.debug("ERROR ROUTER - onError");
if(message.getPayload() instanceof MessageTransformationException) {
MessageTransformationException exception = (MessageTransformationException) message.getPayload();
Message<?> failedMessage = exception.getFailedMessage();
if(exceptionChainContainsDlq(exception)) {
return ErrorMessageChannels.DLQ_QUEUE_NAME;
}
return ErrorMessageChannels.REQUEUE_CHANNEL;
}
return ErrorMessageChannels.DLQ_QUEUE_NAME;
}
...
}
错误路由器由每个流应用程序通过Spring引导应用程序上的包扫描为每个应用程序拾取:
@ComponentScan(basePackages = { "com.acme.error.router" }
@SpringBootApplication
public class StreamApp {}
当部署并运行本地Spring Cloud数据流服务器(1.5.0版)时,如果抛出了DLQException,则消息将成功路由到ErrorRouter中的OnError方法,然后放入DLQ主题。
但是,当使用scdf kubernetes服务器(也是版本1.5.0-release)将其部署为docker容器时,永远不会命中onerror方法。(路由器开头的日志语句从不输出)
在流应用程序的启动日志中,bean似乎被正确地拾取并注册为ErrorChannel的侦听器,但是出于某种原因,当抛出异常时,它们不会被路由器中的onerror方法处理。
启动日志:
o.s.i.endpoint.EventDrivenConsumer : Adding {router:errorMessageMappingRouter.onError.router} as a subscriber to the 'errorChannel' channel
o.s.i.channel.PublishSubscribeChannel : Channel 'errorChannel' has 1 subscriber(s).
o.s.i.endpoint.EventDrivenConsumer : started errorMessageMappingRouter.onError.router
我们正在使用Spring Cloud流和Kafka活页夹配置的所有默认设置:
spring.cloud:
stream:
binders:
kafka:
type: kafka
environment.spring.cloud.stream.kafka.binder.brokers=brokerlist
environment.spring.cloud.stream.kafka.binder.zkNodes=zklist
编辑:添加的pod参数来自
kubectl describe <pod>
Args:
--spring.cloud.stream.bindings.input.group=delivery-stream
--spring.cloud.stream.bindings.output.producer.requiredGroups=delivery-stream
--spring.cloud.stream.bindings.output.destination=delivery-stream.enricher
--spring.cloud.stream.binders.xdkafka.environment.spring.cloud.stream.kafka.binder.zkNodes=<zkNodes>
--spring.cloud.stream.binders.xdkafka.type=kafka
--spring.cloud.stream.binders.xdkafka.defaultCandidate=true
--spring.cloud.stream.binders.xdkafka.environment.spring.cloud.stream.kafka.binder.brokers=<brokers>
--spring.cloud.stream.bindings.input.destination=delivery-stream.config-enricher
我们尝试的另一个想法是使用Spring Cloud流-Spring集成错误通道支持向代理发送有关错误的主题,但由于消息似乎根本没有在全局Spring集成错误通道中登录,因此也不起作用。
在SCDF Kubernetes中,我们需要做什么特殊的事情来启用全球Spring集成错误通道?
我这里缺什么?
使用注释中的解决方案更新:
在检查了您的配置之后,我现在非常确定我知道什么
问题是。您有一个多活页夹配置场景。即使
您只需处理一个绑定器实例
春天,云,流,捆绑……是什么使框架
将其视为多粘合剂。基本上这是个错误-
github.com/spring-cloud/spring-cloud-stream/issues/1384.正如你所能
看到它被修复了,但您需要升级到elmhurst.sr2或获取
最新的快照(我们在RC2中,2.1.0版本在几周后发布)
无论如何)奥列格·朱拉库斯基
这确实是我们设置的问题。我们没有升级,只是暂时取消了多活页夹的使用,问题解决了。