代码之家  ›  专栏  ›  技术社区  ›  pclem12

Spring Cloud数据流错误通道不工作

  •  0
  • pclem12  · 技术社区  · 6 年前

    我正试图为我的SpringCloud数据流创建一个定制的异常处理程序,以路由一些需要重新排队的错误和其他需要dlq'd的错误。

    为此,我使用了全局Spring集成“ErrorChannel”和基于异常类型的路由。

    这是Spring集成错误路由器的代码:

    package com.acme.error.router;
    
    import com.acme.exceptions.DlqException;
    import org.springframework.cloud.stream.annotation.EnableBinding;
    import org.springframework.integration.annotation.MessageEndpoint;
    import org.springframework.integration.annotation.Router;
    import org.springframework.integration.transformer.MessageTransformationException;
    import org.springframework.messaging.Message;
    
    
    @MessageEndpoint
    @EnableBinding({ ErrorMessageChannels.class })
    public class ErrorMessageMappingRouter {
       private static final Logger LOGGER = LoggerFactory.getLogger(ErrorMessageMappingRouter.class);
    
       public static final String ERROR_CHANNEL = "errorChannel";
    
       @Router(inputChannel = ERROR_CHANNEL)
        public String onError(Message<Object> message) {
          LOGGER.debug("ERROR ROUTER - onError");
          if(message.getPayload() instanceof MessageTransformationException) {
             MessageTransformationException exception = (MessageTransformationException) message.getPayload();
             Message<?> failedMessage = exception.getFailedMessage();
              if(exceptionChainContainsDlq(exception)) {
                 return ErrorMessageChannels.DLQ_QUEUE_NAME;
              }
             return ErrorMessageChannels.REQUEUE_CHANNEL;
          }
          return ErrorMessageChannels.DLQ_QUEUE_NAME;
        }
    
        ...
    
    }
    

    错误路由器由每个流应用程序通过Spring引导应用程序上的包扫描为每个应用程序拾取:

    @ComponentScan(basePackages = { "com.acme.error.router" }
    @SpringBootApplication
    public class StreamApp {}
    

    当部署并运行本地Spring Cloud数据流服务器(1.5.0版)时,如果抛出了DLQException,则消息将成功路由到ErrorRouter中的OnError方法,然后放入DLQ主题。

    但是,当使用scdf kubernetes服务器(也是版本1.5.0-release)将其部署为docker容器时,永远不会命中onerror方法。(路由器开头的日志语句从不输出)

    在流应用程序的启动日志中,bean似乎被正确地拾取并注册为ErrorChannel的侦听器,但是出于某种原因,当抛出异常时,它们不会被路由器中的onerror方法处理。

    启动日志:

    o.s.i.endpoint.EventDrivenConsumer : Adding {router:errorMessageMappingRouter.onError.router} as a subscriber to the 'errorChannel' channel
    o.s.i.channel.PublishSubscribeChannel : Channel 'errorChannel' has 1 subscriber(s).
    o.s.i.endpoint.EventDrivenConsumer : started errorMessageMappingRouter.onError.router
    

    我们正在使用Spring Cloud流和Kafka活页夹配置的所有默认设置:

    spring.cloud:
      stream:
        binders:
          kafka:
            type: kafka
            environment.spring.cloud.stream.kafka.binder.brokers=brokerlist
            environment.spring.cloud.stream.kafka.binder.zkNodes=zklist
    

    编辑:添加的pod参数来自 kubectl describe <pod>

    Args:
    --spring.cloud.stream.bindings.input.group=delivery-stream
    --spring.cloud.stream.bindings.output.producer.requiredGroups=delivery-stream
    --spring.cloud.stream.bindings.output.destination=delivery-stream.enricher
    --spring.cloud.stream.binders.xdkafka.environment.spring.cloud.stream.kafka.binder.zkNodes=<zkNodes>
    --spring.cloud.stream.binders.xdkafka.type=kafka
    --spring.cloud.stream.binders.xdkafka.defaultCandidate=true
    --spring.cloud.stream.binders.xdkafka.environment.spring.cloud.stream.kafka.binder.brokers=<brokers>
    --spring.cloud.stream.bindings.input.destination=delivery-stream.config-enricher
    

    我们尝试的另一个想法是使用Spring Cloud流-Spring集成错误通道支持向代理发送有关错误的主题,但由于消息似乎根本没有在全局Spring集成错误通道中登录,因此也不起作用。

    在SCDF Kubernetes中,我们需要做什么特殊的事情来启用全球Spring集成错误通道?

    我这里缺什么?

    使用注释中的解决方案更新:

    在检查了您的配置之后,我现在非常确定我知道什么 问题是。您有一个多活页夹配置场景。即使 您只需处理一个绑定器实例 春天,云,流,捆绑……是什么使框架 将其视为多粘合剂。基本上这是个错误- github.com/spring-cloud/spring-cloud-stream/issues/1384.正如你所能 看到它被修复了,但您需要升级到elmhurst.sr2或获取 最新的快照(我们在RC2中,2.1.0版本在几周后发布) 无论如何)奥列格·朱拉库斯基

    这确实是我们设置的问题。我们没有升级,只是暂时取消了多活页夹的使用,问题解决了。

    1 回复  |  直到 6 年前
        1
  •  0
  •   pclem12    6 年前

    使用注释中的解决方案更新:

    在检查了您的配置之后,我现在非常确定我知道什么 问题是。您有一个多活页夹配置场景。即使 您只需处理一个绑定器实例 春天,云,流,捆绑……是什么使框架 将其视为多粘合剂。基本上这是个错误- github.com/spring-cloud/spring-cloud-stream/issues/1384.正如你所能 看到它被修复了,但您需要升级到elmhurst.sr2或获取 最新的快照(我们在RC2中,2.1.0版本在几周后发布) 无论如何)奥列格·朱拉库斯基

    这确实是我们设置的问题。我们没有升级,只是暂时取消了多活页夹的使用,问题解决了。