代码之家  ›  专栏  ›  技术社区  ›  el323

Kafka流:通过Json日志中的键进行分组

  •  2
  • el323  · 技术社区  · 6 年前

    我有一个带有输入主题的kafka Streams应用程序 input 其中以下记录作为json日志提供:

    JSON日志:

    {"CreationTime":"2018-02-12T12:32:31","UserId":"abc@gmail.com","Operation":"upload","Workload":"Drive"}

    我正在根据以下主题构建流:

    final StreamsBuilder builder = new StreamsBuilder();
    KStream<String, String> source_user_activity = builder.stream("input");
    

    接下来我想分组 "UserId" 并查找每个用户的计数。

    final Serde<String> stringSerde = Serdes.String();
    final Serde<Long> longSerde = Serdes.Long();
    
    final StreamsBuilder builder = new StreamsBuilder();
    KStream<String, String> source_user_activity = builder.stream("input");
    
    final KTable<String, Long> wordCounts = source_user_activity
            .flatMap((key, value) -> {
                List<KeyValue<String, String>> result = new LinkedList<>();
                JSONObject valueObject = new JSONObject(value);
                result.add(KeyValue.pair((valueObject.get("UserId").toString()), valueObject.toString()));
                return result;
            })
            .groupByKey()
            .count();
    
    wordCounts.toStream().to("output",Produced.with(stringSerde, longSerde));
    wordCounts.print();
    

    接下来,我将从 output 主题使用 console-consumer .我没有看到任何文本,它只是这样的东西:

    然而 wordCounts.print() 显示以下内容:

    [KSTREAM-AGGREGATE-0000000003]: abc@gmail.com, (1<-null)

    我做错了什么?谢谢

    enter image description here

    1 回复  |  直到 5 年前
        1
  •  2
  •   Matthias J. Sax    6 年前

    值的数据编码为 long (您正在使用 LongSerde 对于值)和控制台使用者用户 StringDeserializer 默认情况下,它无法正确地反序列化该值。

    您需要指定 LongDeserializer 通过控制台使用者的命令行参数获取值。