代码之家  ›  专栏  ›  技术社区  ›  Punter Vicky

Spring Kafka consumer-带恢复回调机制的手动提交

  •  1
  • Punter Vicky  · 技术社区  · 7 年前

        @Bean
        public ConcurrentKafkaListenerContainerFactory<String, Map<String, Object>> kafkaListenerContainerFactory() {
            ConcurrentKafkaListenerContainerFactory<String, Map<String, Object>> factory = new ConcurrentKafkaListenerContainerFactory<>();
            factory.setConcurrency(conncurrency);
            factory.setConsumerFactory(consumerFactory());
            factory.setRetryTemplate(retryTemplate());
                    factory.setRecoveryCallback(new RecoveryCallback<Object>() {
            @Override
            public Object recover(RetryContext context) throws Exception {
                // TODO Auto-generated method stub
                logger.debug(" In recovery callback method !!");
                return null;
            }
        });
            factory.getContainerProperties().setAckMode(AckMode.MANUAL);
            return factory;
        }
    
        /*
         * Retry template.
         */
    
        protected RetryPolicy retryPolicy() {
            SimpleRetryPolicy policy = new SimpleRetryPolicy(maxRetryAttempts, retryableExceptions);
            return policy;
        }
    
        protected BackOffPolicy backOffPolicy() {
            ExponentialBackOffPolicy policy = new ExponentialBackOffPolicy();
            policy.setInitialInterval(initialRetryInterval);
            policy.setMultiplier(retryMultiplier);
            return policy;
        }
    
        protected RetryTemplate retryTemplate() {
           RetryTemplate template = new RetryTemplate();
           template.setRetryPolicy(retryPolicy());
           template.setBackOffPolicy(backOffPolicy());
           return template;
        }
    }
    
    1 回复  |  直到 7 年前
        1
  •  3
  •   Artem Bilan    7 年前

    在框架中没有任何假设,如果在消费错误期间重试耗尽,您可以做什么。

    我认为你应该从 Spring Retry RecoveryCallback

    如果在模板决定中止之前业务逻辑没有成功,那么客户机就有机会通过恢复回调进行一些替代处理。

    A. RetryContext

    /**
     * Accessor for the exception object that caused the current retry.
     * 
     * @return the last exception that caused a retry, or possibly null. It will be null
     * if this is the first attempt, but also if the enclosing policy decides not to
     * provide it (e.g. because of concerns about memory usage).
     */
    Throwable getLastThrowable();
    

    RetryContext 恢复回调 : https://docs.spring.io/spring-kafka/docs/2.0.0.RELEASE/reference/html/_reference.html#_retrying_deliveries

    的内容 RetryContext 传递到 将取决于侦听器的类型。上下文将始终具有属性 record acknowledgment 和/或 consumer RetryingAcknowledgingMessageListenerAdapter 为这些键提供静态常数。有关更多信息,请参阅其javadocs。