代码之家  ›  专栏  ›  技术社区  ›  Annu

使用Spring Kafka添加自定义标题

  •  7
  • Annu  · 技术社区  · 6 年前

    我计划使用Spring Kafka客户端在Spring Boot应用程序中使用和生成来自Kafka设置的消息。我在Kafka 0.11中看到了对自定义标题的详细支持 here . 虽然它可供本地卡夫卡制作者和消费者使用,但我看不到对在Spring Kafka中添加/读取自定义标题的支持。

    我正在尝试根据重试次数为消息实现DLQ,我希望将其存储在消息头中,而无需解析负载。

    2 回复  |  直到 6 年前
        1
  •  8
  •   Artem Bilan    6 年前

    Spring Kafka提供了headers支持 版本2.0 : https://docs.spring.io/spring-kafka/docs/2.1.2.RELEASE/reference/html/_reference.html#headers

    你可以拥有它 KafkaHeaderMapper 实例,并使用它填充 Message 在通过发送之前 KafkaTemplate.send(Message<?> message) . 或者你可以用平原 KafkaTemplate.send(ProducerRecord<K, V> record) .

    当您使用接收记录时 KafkaMessageListenerContainer 这个 卡夫卡·海德堡 可通过 MessagingMessageConverter 注入到 RecordMessagingMessageListenerAdapter .

    因此,任何自定义标头都可以通过任何一种方式进行传输。

        2
  •  7
  •   BitfulByte    4 年前

    当我偶然发现这个问题时,我正在寻找答案。但是我使用 ProducerRecord<?, ?> 类而不是 Message<?> ,因此标题映射器似乎不相关。

    下面是我添加自定义标题的方法:

    var record = new ProducerRecord<String, String>(topicName, "Hello World");
    record.headers().add("foo", "bar".getBytes());
    kafkaTemplate.send(record);
    

    现在,为了阅读标题(在使用之前),我添加了一个自定义拦截器。

    import java.util.List;
    import lombok.extern.slf4j.Slf4j;
    import org.apache.kafka.clients.consumer.ConsumerInterceptor;
    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.apache.kafka.clients.consumer.ConsumerRecords;
    
    @Slf4j
    public class MyConsumerInterceptor implements ConsumerInterceptor<Object, Object> {
    
        @Override
        public ConsumerRecords<Object, Object> onConsume(ConsumerRecords<Object, Object> records) {
            Set<TopicPartition> partitions = records.partitions();
            partitions.forEach(partition -> interceptRecordsFromPartition(records.records(partition)));
    
            return records;
        }
    
        private void interceptRecordsFromPartition(List<ConsumerRecord<Object, Object>> records) {
            records.forEach(record -> {
                var myHeaders = new ArrayList<Header>();
                record.headers().headers("MyHeader").forEach(myHeaders::add);
                log.info("My Headers: {}", myHeaders);
                // Do with header as you see fit
            });
        }
    
        @Override public void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets) {}
        @Override public void close() {}
        @Override public void configure(Map<String, ?> configs) {}
    }
    

    最后一位是使用以下(Spring Boot)配置向Kafka使用者容器注册此拦截器:

    import java.util.Map;
    import org.apache.kafka.clients.consumer.ConsumerConfig;
    import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.kafka.core.ConsumerFactory;
    import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
    
    @Configuration
    public class MessagingConfiguration {
    
        @Bean
        public ConsumerFactory<?, ?> kafkaConsumerFactory(KafkaProperties properties) {
            Map<String, Object> consumerProperties = properties.buildConsumerProperties();
            consumerProperties.put(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, MyConsumerInterceptor.class.getName());
            return new DefaultKafkaConsumerFactory<>(consumerProperties);
        }
    
    }