代码之家  ›  专栏  ›  技术社区  ›  chris01

从ConsumerRecord获取序列化字符串是热门吗?

  •  0
  • chris01  · 技术社区  · 2 年前

    我尝试使用带有JSON的Kafka作为值的格式。为了在我的使用者中自动处理反序列化,并进行日志记录,我为JsonDeserializer和自定义ErrorHandler设置了它。

    application.properties

    spring.kafka.consumer.key-deserializer = org.apache.kafka.common.serialization.StringDeserializer
    spring.kafka.consumer.value-deserializer = org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
    spring.kafka.properties.spring.deserializer.value.delegate.class = org.springframework.kafka.support.serializer.JsonDeserializer
    

    KafkaConfig.java

    @Configuration
    @EnableKafka
    public class KafkaConfig 
    {
        @Bean
        public DefaultErrorHandler errorHandler (KafkaTemplate <String, String> template) 
        {
            return new DefaultErrorHandler ()
            {
                @Override
                public void handleRemaining (Exception ex, java.util.List<ConsumerRecord <?, ?>> records,
                Consumer<?, ?> consumer, MessageListenerContainer container) 
                {
                  // here we do logging stuff
                  System.out.println ("##: " + records.size ());
                  System.out.println ("ex: " + ex.getMessage());
                  ConsumerRecord <?, ?> cr = records.get (0);
                  System.out.println ("ku: " + KafkaUtils.format (cr));
                  System.out.println ("st: " + cr.toString ());
                  System.out.println ("key: " + cr.key ());
                  System.out.println ("val: " + cr.value ());
                  System.out.println ("vsz: "+ cr.serializedValueSize());
                }
            };
        }
    

    到目前为止,一切都很好。

    现在,我想到了一种将值作为字符串的方法,以便将其包含在日志中。

    我如何从ConsumerRecord中获取这些信息?

    到目前为止的输出是这样的,以防出现错误的消息。我只是用kafka的一部分kafka-producer.sh脚本进行测试,然后让我通过控制台发送消息。vsz与我的输入匹配,所以不会完全错。

    ##: 1
    ex: Listener failed; nested exception is org.springframework.kafka.support.serializer.DeserializationException: failed to deserialize; nested exception is java.lang.IllegalStateException: No type information in headers and no default type provided
    ku: mytopic-0@82
    st: ConsumerRecord(topic = mytopic, partition = 0, leaderEpoch = 0, offset = 82, CreateTime = 1650039248945, serialized key size = -1, serialized value size = 3, headers = RecordHeaders(headers = [RecordHeader(key = springDeserializerExceptionValue, value = [-84, -19,..., 112, 1 100, 102])], isReadOnly = false), key = null, value = null)
    key: null
    val: null
    vsz: 3
    

    我的想法是创建一个StringSerializer,但是cr.value()为null,所以这是一条死胡同。

    0 回复  |  直到 2 年前
        1
  •  1
  •   Gary Russell    2 年前

    你必须打电话 super.handleRemaining() 以便为下一次轮询正确地重新定位记录。

    无法反序列化的值在中可用 DeserializationException ,这是 ListenerExecutionFailedExeption 传递到错误处理程序。

    /**
     * Get the data that failed deserialization (value or key).
     * @return the data.
     */
    public byte[] getData() {
        return this.data; // NOSONAR array reference
    }