代码之家  ›  专栏  ›  技术社区  ›  singe3

Spring Kafka 2.2类型映射类加载器不匹配

  •  3
  • singe3  · 技术社区  · 6 年前

    我尝试使用SpringKafka 2.2中引入的新类型映射功能:

    向下滚动到“映射类型”:
    https://docs.spring.io/spring-kafka/reference/htmlsingle/#serdes

    在生产者方面,我注册了一个映射,如下所示:

    senderProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
    senderProps.put(JsonSerializer.TYPE_MAPPINGS, "foo:com.myfoo.Foo");
    

    对消费者的评价如下:

    consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
    consumerProps.put(JsonDeSerializer.TYPE_MAPPINGS, "foo:com.yourfoo.Foo");
    

    但是,在向卡夫卡发送事件时, com.myfoo.Foo 类,添加到记录的ClassID头是 Fo.MyFo.Foo 而不是 foo . 因此,在使用者方面,它未能反序列化,因为 Fo.MyFo.Foo 是未知的。

    我在春季卡夫卡把问题缩小到了这种方法:

    https://github.com/spring-projects/spring-kafka/blob/master/spring-kafka/src/main/java/org/springframework/kafka/support/converter/AbstractJavaTypeMapper.java#L142

    protected void addHeader(Headers headers, String headerName, Class<?> clazz) {
        if (this.classIdMapping.containsKey(clazz)) {
            headers.add(new RecordHeader(headerName, this.classIdMapping.get(clazz)));
        }
        else {
            headers.add(new RecordHeader(headerName, clazz.getName().getBytes(StandardCharsets.UTF_8)));
        }
    }
    

    当通过执行kafka记录的序列化进行调试时,实际上执行到了else分支,而在我的理解中,它实际上应该转到if分支并添加 作为标题。相反,它增加了 Fo.MyFo.Foo 到页眉。

    罪魁祸首似乎与类加载器不匹配有关,但我不确定这是否真的是一个bug或是我在自己身上做的愚蠢的事情。

    基本上, classIdMapping 地图填写正确 Fo.MyFo.Foo 作为键和的utf-8 byte[]表示 作为相应的值。

    但是,在国际单项体育联合会的比赛中, clazz 参数是由不同的类加载器加载的类,与存储在 锁相映射 映射,因此散列代码是不同的,它将转到else分支。

    它实际上是卡夫卡方面的一个bug,还是我配置错误了?

    谢谢

    1 回复  |  直到 6 年前
        1
  •  2
  •   Artem Bilan    6 年前

    这看起来确实是我们实现中的一个遗漏。请针对Apache Kafka项目提出一个问题: https://github.com/spring-projects/spring-kafka . 我们的想法是不要用 Class<?> 用于映射,但它 完全限定的类名 .

    您的应用程序似乎是多类加载器,例如,它是WebOne。ApacheKafka客户端加载 senderProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class); 同时,在它的类加载器中,其他一切都由应用程序上下文类加载器完成。

    作为一种解决方法,我们在解决问题的同时,建议使用 DefaultKafkaProducerFactory 配置及其 setValueSerializer() 填充 JsonSerializer bean引用。这样,您需要通过 setTypeMapper() 因此, setIdClassMapping() DefaultJackson2JavaTypeMapper .