代码之家  ›  专栏  ›  技术社区  ›  Davide Cerbo

融合Kafka Avro序列化器和Spring Cloud

  •  0
  • Davide Cerbo  · 技术社区  · 6 年前

    我正在尝试使用Spring Cloud和Kafka Avro序列化器在Kafka上生成一个事件。

    在application.yml中,我有下面的配置,但是当序列化程序尝试生成消息时,它会生成字节,因为传递给KafkaSerializer中getScheme方法的对象是字节数组,而不是GenericRecord。我认为我需要在Spring Cloud中使用一个特定的消息转换器,但是我没有找到。

    cloud:
    stream:
      kafka:
        binder:
          brokers:
            - 'localhost:9092'
          useNativeDecoding: true
        bindings:
          Ptr-output:
              producer:
                configuration:
                  schema.registry.url: 'http://localhost:8081'
                  key.serializer: org.apache.kafka.common.serialization.StringSerializer
                  value.serializer: com.abc.message.ptr.KafkaSerializer
      schemaRegistryClient:
        endpoint: 'http://localhost:8081'
      bindings:
         Ptr-output:
          contentType: application/*+avro
          destination: Ptr
      schema:
        avro:
          schema-locations: 'classpath:avro/Ptr.avsc'
          dynamic-schema-generation-enabled: false
    

    我该怎么办?我该怎么解决?

    1 回复  |  直到 6 年前
        1
  •  0
  •   Gary Russell    6 年前

    the documentation 尤其是,您需要设置producer属性 useNativeEncoding .

    使用编码

    当设置为true时,出站消息由客户端库直接序列化,客户端库必须进行相应的配置(例如,设置适当的Kafka producer值序列化程序)。使用此配置时,出站消息封送处理不基于绑定的contentType。使用本机编码时,使用者有责任使用适当的解码器(例如,Kafka使用者值反序列化器)来反序列化入站消息。此外,当使用本机编码和解码时,headerMode=embeddedHeaders属性将被忽略,并且消息中不嵌入头。请参见消费者属性useNativeDecoding。