代码之家  ›  专栏  ›  技术社区  ›  Stella

Kafka流处理期间处理数据时出现异常

  •  4
  • Stella  · 技术社区  · 6 年前

    我正在使用下面的代码处理卡夫卡流。我从JSON obj中检查一个过滤条件,如果 "UserID":"1" 。请参考以下代码

    builder.<String,String>stream(Serdes.String(), Serdes.String(), topic)
                       .filter(new Predicate <String, String>() {
    
                   String userIDCheck = null;
    
                   @Override
                public boolean test(String key, String value) {
    
                       try {
                           JSONObject jsonObj = new JSONObject(value);
    
                           userIDCheck = jsonObj.get("UserID").toString();
                           System.out.println("userIDCheck: " + userIDCheck);                          
                       } catch (JSONException e) {
                           // TODO Auto-generated catch block
                           e.printStackTrace();
                       }
    
                       return userIDCheck.equals("1");
                   }
                })
               .to(streamouttopic);
    

    值:{“UserID”:“1”,“Address”:“XXX”,“AccountNo”:“989”,“UserName”:“Stella”,“AccountType”:“YYY”}

    我得到以下错误:

        Exception in thread "SampleStreamProducer-4eecc3ab-858c-44a4-9b8c-5ece2b4ab21a-StreamThread-1" org.apache.kafka.streams.errors.StreamsException: Exception caught in process. taskId=0_0, processor=KSTREAM-SOURCE-0000000000, topic=testtopic1, partition=0, offset=270
        at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:203)
        at org.apache.kafka.streams.processor.internals.StreamThread.processAndPunctuate(StreamThread.java:679)
        at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:557)
        at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:527)
    Caused by: org.apache.kafka.streams.errors.StreamsException: A serializer (key: org.apache.kafka.common.serialization.ByteArraySerializer / value: org.apache.kafka.common.serialization.ByteArraySerializer) is not compatible to the actual key or value type (key type: unknown because key is null / value type: java.lang.String). Change the default Serdes in StreamConfig or provide correct Serdes via method parameters.
        at org.apache.kafka.streams.processor.internals.SinkNode.process(SinkNode.java:91)
        at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:82)
        at org.apache.kafka.streams.kstream.internals.KStreamFilter$KStreamFilterProcessor.process(KStreamFilter.java:43)
        at org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:47)
        at org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:187)
        at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:133)
        at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:82)
        at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:80)
        at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:189)
        ... 3 more
    Caused by: java.lang.ClassCastException: java.lang.String cannot be cast to [B
        at org.apache.kafka.common.serialization.ByteArraySerializer.serialize(ByteArraySerializer.java:21)
        at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:89)
        at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:76)
        at org.apache.kafka.streams.processor.internals.SinkNode.process(SinkNode.java:87)
    

    从上面的流代码来看,值和条件都很好,我无法理解为什么它在执行steam代码时会出现此异常。

    1 回复  |  直到 6 年前
        1
  •  10
  •   Matthias J. Sax    3 年前

    报告的问题应仅适用于卡夫卡2.0及以上版本。自2.1.0发布以来,Kafka Streams支持“serde下推”和 to() 操作员应从上游继承正确的SERDE(cf https://issues.apache.org/jira/browse/KAFKA-7456 )。

    对于Kafka 2.0及更早版本,必须为 至() 操作,显式。否则,它将使用 StreamsConfig 那就是 ByteArraySerde (因为语义或serde覆盖是每个操作符的“插入式覆盖”)——和 String 无法转换为 byte[]

    您需要执行以下操作:

    .to(streamoutputtopic, Produced.with(Serdes.String(), Serdes.String()));
    

    对于不使用 Produced 参数,代码为:

    .to(Serdes.String(), Serdes.String(), streamoutputtopic);