我尝试使用带有JSON的Kafka作为值的格式。为了在我的使用者中自动处理反序列化,并进行日志记录,我为JsonDeserializer和自定义ErrorHandler设置了它。
application.properties
spring.kafka.consumer.key-deserializer = org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer = org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
spring.kafka.properties.spring.deserializer.value.delegate.class = org.springframework.kafka.support.serializer.JsonDeserializer
KafkaConfig.java
@Configuration
@EnableKafka
public class KafkaConfig
{
@Bean
public DefaultErrorHandler errorHandler (KafkaTemplate <String, String> template)
{
return new DefaultErrorHandler ()
{
@Override
public void handleRemaining (Exception ex, java.util.List<ConsumerRecord <?, ?>> records,
Consumer<?, ?> consumer, MessageListenerContainer container)
{
// here we do logging stuff
System.out.println ("##: " + records.size ());
System.out.println ("ex: " + ex.getMessage());
ConsumerRecord <?, ?> cr = records.get (0);
System.out.println ("ku: " + KafkaUtils.format (cr));
System.out.println ("st: " + cr.toString ());
System.out.println ("key: " + cr.key ());
System.out.println ("val: " + cr.value ());
System.out.println ("vsz: "+ cr.serializedValueSize());
}
};
}
到目前为止,一切都很好。
现在,我想到了一种将值作为字符串的方法,以便将其包含在日志中。
我如何从ConsumerRecord中获取这些信息?
到目前为止的输出是这样的,以防出现错误的消息。我只是用kafka的一部分kafka-producer.sh脚本进行测试,然后让我通过控制台发送消息。vsz与我的输入匹配,所以不会完全错。
##: 1
ex: Listener failed; nested exception is org.springframework.kafka.support.serializer.DeserializationException: failed to deserialize; nested exception is java.lang.IllegalStateException: No type information in headers and no default type provided
ku: mytopic-0@82
st: ConsumerRecord(topic = mytopic, partition = 0, leaderEpoch = 0, offset = 82, CreateTime = 1650039248945, serialized key size = -1, serialized value size = 3, headers = RecordHeaders(headers = [RecordHeader(key = springDeserializerExceptionValue, value = [-84, -19,..., 112, 1 100, 102])], isReadOnly = false), key = null, value = null)
key: null
val: null
vsz: 3
我的想法是创建一个StringSerializer,但是cr.value()为null,所以这是一条死胡同。