代码之家  ›  专栏  ›  技术社区  ›  Javed

一旦交付,就可以通过Spring Cloud Stream Binder Kafka或Spring Kafka使用

  •  0
  • Javed  · 技术社区  · 6 年前

    我正试图在SpringBoot应用程序中使用SpringCloud流绑定器Kafka实现一次性交付。 我使用的版本是:

    • spring-cloud-stream-binder-kafka-core-1.2.1.发布
    • spring-cloud-stream-binder-kafka-1.2.1.发布
    • spring-cloud-stream-codec-1.2.2.发布spring-kafka-1.1.6.发布
    • Spring-Integration-Kafka-2.1.0.发布
    • Spring-Integration-Core-4.3.10.发布
    • 动物园管理员-3.4.8
    • 卡夫卡版本:0.10.1.1

    这是我的配置(云配置):

        spring:
          autoconfigure:
            exclude: org.springframework.cloud.netflix.metrics.servo.ServoMetricsAutoConfiguration
          kafka:
            consumer:
              enable-auto-commit: false
          cloud:
            stream:
              kafka:
                binder:
                  brokers: "${BROKER_HOST:xyz-aws.local:9092}"
                  headers:
                    - X-B3-TraceId
                    - X-B3-SpanId
                    - X-B3-Sampled
                    - X-B3-ParentSpanId
                    - X-Span-Name
                    - X-Process-Id
                  zkNodes: "${ZOOKEEPER_HOST:120.211.316.261:2181,120.211.317.252:2181}"
                bindings:
                  feed_platform_events_input:
                    consumer:
                      autoCommitOffset: false
              binders:
                xyzkafka:
                  type: kafka
              bindings:
                feed_platform_events_input:
                  binder: xyzkafka
                  destination: platform-events
                  group: br-platform-events
    

    我有两个主要班: 进料槽接口:

    package au.com.xyz.proxy.interfaces;
    import org.springframework.cloud.stream.annotation.Input; 
    import org.springframework.messaging.MessageChannel;
    
    public interface FeedSink {
    
    String FEED_PLATFORM_EVENTS_INPUT = "feed_platform_events_input";
    
    @Input(FeedSink.FEED_PLATFORM_EVENTS_INPUT)
    MessageChannel feedlatformEventsInput();
    } 
    

    事件消费者

    package au.com.xyz.proxy.consumer;
    
    @Slf4j
    @EnableBinding(FeedSink.class)
    public class EventConsumer {
    
        public static final String SUCCESS_MESSAGE =
                "SEND-SUCCESS : Successfully sent message to platform.";
        public static final String FAULT_MESSAGE = "SOAP-FAULT Code: {}, Description: {}";
        public static final String CONNECT_ERROR_MESSAGE = "CONNECT-ERROR Error Details: {}";
        public static final String EMPTY_NOTIFICATION_ERROR_MESSAGE =
                "EMPTY-NOTIFICATION-ERROR Empty Event Received from platform";
    
        @Autowired
        private CapPointService service;
    
        @StreamListener(FeedSink.FEED_PLATFORM_EVENTS_INPUT)
        /**
         * method associated with stream to process message.
         */
        public void message(final @Payload EventNotification eventNotification,
                            final @Header(KafkaHeaders.ACKNOWLEDGMENT) Acknowledgment acknowledgment) {
    
            String caseMilestone = "UNKNOWN";
            if (!ObjectUtils.isEmpty(eventNotification)) {
                SysMessage sysMessage = processPayload(eventNotification);
                caseMilestone = sysMessage.getCaseMilestone();
                try {
                    ClientResponse response = service.sendPayload(sysMessage);
                    if (response.hasFault()) {
                        Fault faultDetails = response.getFaultDetails();
                        log.error(FAULT_MESSAGE, faultDetails.getCode(), faultDetails.getDescription());
                    } else {
                        log.info(SUCCESS_MESSAGE);
                    }
                    acknowledgment.acknowledge();
                } catch (Exception e) {
                    log.error(CONNECT_ERROR_MESSAGE, e.getMessage());
                }
            } else {
                log.error(EMPTY_NOTIFICATION_ERROR_MESSAGE);
                acknowledgment.acknowledge();
            }
        }
    
    
    
        private SysMessage processPayload(final EventNotification eventNotification) {
            Gson gson = new Gson();
            String jsonString =  gson.toJson(eventNotification.getData());
            log.info("Consumed message for platform events with payload : {} ", jsonString);
            SysMessage sysMessage = gson.fromJson(jsonString, SysMessage.class);
            return sysMessage;
        }
        }
    

    我已经将kafka和spring容器的autocommit属性设置为false。 如果您在eventconsumer类中看到,在i service.sendpayload成功且没有异常的情况下,我使用了acknowledge。我希望容器移动偏移量并轮询下一个记录。 我所观察到的是:

    • 场景1-在异常被抛出并且在Kafka上没有发布新消息的情况下。没有重试处理消息,似乎没有活动。即使基础问题得到解决。我提到的问题是下游服务器不可用。是否有方法重试处理n次,然后放弃。注意:这是从上次提交的偏移量重新处理或重新调用的尝试。这不是关于Kafka实例不可用。 如果我重新启动服务(EC2实例),那么处理将从上次成功确认完成的偏移量开始。

    • 场景2-如果发生异常,然后将后续消息推送到Kafka。我看到新消息被处理,偏移量被移动。这意味着我丢失了未被确认的信息。所以问题是我是否已经处理了确认。如何控制从上一次提交读取而不仅仅是最新消息并进行处理?我假设有一个内部的民意调查发生,它没有考虑到或不知道最后一条消息没有被确认。我不认为有多个线程从卡夫卡阅读。我不知道@input和@streamlistener注释是如何控制的。我假设线程由控制线程的属性consumer.concurrency控制,默认情况下它设置为1。

    所以我做了研究,发现了很多链接,但不幸的是没有一个回答我的具体问题。 我看着( https://github.com/spring-cloud/spring-cloud-stream/issues/575 ) 有马吕斯的评论( https://stackoverflow.com/users/809122/marius-bogoevici ):

    请注意,Kafka不提供单独的消息确认,这是 意味着确认转化为更新最新使用的 偏移量到确认消息的偏移量(每个主题/分区)。那个 意味着,如果您将来自同一主题的消息分块打包 当然,消息可以“确认”之前的所有消息。

    当有一个线程时,不确定是否是订单问题。

    很抱歉发了这么长的帖子,但我想提供足够的信息。最重要的是,我试图避免在从卡夫卡消费时丢失消息,我试图看看SpringCloudstreambinder卡夫卡是否能做到这一点,或者我必须寻找替代方案。

    更新日期:2018年7月6日

    我看到这个帖子了 https://github.com/spring-projects/spring-kafka/issues/431 这是解决我问题的更好方法吗?我可以试试最新版本的春季卡夫卡

    @KafkaListener(id = "qux", topics = "annotated4", containerFactory = "kafkaManualAckListenerContainerFactory",
                    containerGroup = "quxGroup")
    public void listen4(@Payload String foo, Acknowledgment ack, Consumer<?, ?> consumer) {
    
    • 这有助于控制要设置到最后一个位置的偏移量吗? 是否成功处理记录?我怎么能听你这么说呢 方法。consumer.seektoend();然后如何重置listen方法以获取该记录?
    • 把消费者放在签名中是否提供支持 处理消费者问题?或者我还需要做什么?
    • 我应该使用Acknowledge还是Consumer.Commitsyncy()
    • 集装箱工厂的意义是什么?我必须定义它吗 像豆子一样。
    • 我是否需要@enablekafka和@configuration才能使用上述方法? 记住,该应用程序是弹簧引导应用程序。
    • 通过添加Consumer to Listen方法,我不需要实现 消费软件接口?

    最后但并非最不重要的是,如果可行,是否可以提供上述方法的一些示例。

    更新日期:2018年7月12日

    谢谢,加里( https://stackoverflow.com/users/1240763/gary-russell )提供使用maxattempts的提示。我用过那种方法。我能够实现一次交付,并保持消息的顺序。

    我更新的云配置:

        spring:
          autoconfigure:
            exclude: org.springframework.cloud.netflix.metrics.servo.ServoMetricsAutoConfiguration
          kafka:
            consumer:
              enable-auto-commit: false
          cloud:
            stream:
              kafka:
                binder:
                  brokers: "${BROKER_HOST:xyz-aws.local:9092}"
                  headers:
                    - X-B3-TraceId
                    - X-B3-SpanId
                    - X-B3-Sampled
                    - X-B3-ParentSpanId
                    - X-Span-Name
                    - X-Process-Id
                  zkNodes: "${ZOOKEEPER_HOST:120.211.316.261:2181,120.211.317.252:2181}"
                bindings:
                  feed_platform_events_input:
                    consumer:
                      autoCommitOffset: false
              binders:
                xyzkafka:
                  type: kafka
              bindings:
                feed_platform_events_input:
                  binder: xyzkafka
                  destination: platform-events
                  group: br-platform-events
                  consumer:
                    maxAttempts: 2147483647
                    backOffInitialInterval: 1000
                    backOffMaxInterval: 300000
                    backOffMultiplier: 2.0
    

    事件使用者与我的初始实现保持相同。除非再次引发错误,否则容器将知道处理失败。如果您只是捕获它,那么容器就无法知道消息处理有失败。通过执行Acknoweldgement.Acknowledge,您只需控制偏移提交。为了重试,您必须抛出异常。不要忘记将kafka客户机autocommit属性和spring(容器级别)autocommitoffset属性设置为false。就是这样。

    1 回复  |  直到 6 年前
        1
  •  1
  •   Gary Russell    6 年前

    enableDlq

    ContainerStoppingErrorHandler SeekToCurrentErrorHandler