我有一个带有输入主题的kafka Streams应用程序
input
其中以下记录作为json日志提供:
JSON日志:
{"CreationTime":"2018-02-12T12:32:31","UserId":"abc@gmail.com","Operation":"upload","Workload":"Drive"}
我正在根据以下主题构建流:
final StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> source_user_activity = builder.stream("input");
接下来我想分组
"UserId"
并查找每个用户的计数。
final Serde<String> stringSerde = Serdes.String();
final Serde<Long> longSerde = Serdes.Long();
final StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> source_user_activity = builder.stream("input");
final KTable<String, Long> wordCounts = source_user_activity
.flatMap((key, value) -> {
List<KeyValue<String, String>> result = new LinkedList<>();
JSONObject valueObject = new JSONObject(value);
result.add(KeyValue.pair((valueObject.get("UserId").toString()), valueObject.toString()));
return result;
})
.groupByKey()
.count();
wordCounts.toStream().to("output",Produced.with(stringSerde, longSerde));
wordCounts.print();
接下来,我将从
output
主题使用
console-consumer
.我没有看到任何文本,它只是这样的东西:
然而
wordCounts.print()
显示以下内容:
[KSTREAM-AGGREGATE-0000000003]: abc@gmail.com, (1<-null)
我做错了什么?谢谢