我有一个Kafka主题,其中包含Json格式的数据:
{"id": "A", "country": "France"}
{"id": "B", "currency": "£"}
我想用“引用表”之类的东西规范化内容:
country ( "France" ) -> "FR"
currency ( "£" ) -> "GBP"
为了输出:
{"id": "A", "country": "FR"}
{"id": "B", "currency": "GBP"}
我认为这是使用
KTable
以存储引用数据。但我在执行上有点拘泥。
当前状态
摄取参考数据
卡夫卡专题:
poc-mapping-in
提供了示例Json数据的主题:
{"mapping":"ccy", "from":"£", "to":"GBP"}
{"mapping":"country", "from":"France", "to":"FR"}
接收到的数据
K表
对键和值进行返工后:
KStream<String, String> mappingStream = builder
.stream("poc-mapping-in",consumed)
.map(
(key, value) -> KeyValue.pair(
value.get("mapping")+"#"+value.get("from"),
value.get("to").asText())
);
KGroupedStream<String, String> mappingGroupedStream = mappingStream.groupByKey(
Serialized.with(Serdes.String(),Serdes.String() ));
KTable<String,String> mappingTable = mappingGroupedStream.aggregate(
() -> "", //initializer
(aggKey, newValue, aggValue) -> newValue, // adder
Materialized.<String, String, KeyValueStore<Bytes, byte[]>>as("ReferenceStore")
.withValueSerde(Serdes.String())
.withKeySerde(Serdes.String())
);
// Testing
mappingTable.toStream().to("poc-mapping-in-content",
Produced.with(Serdes.String(), Serdes.String()));
在这个话题上
poc-mapping-in-content
,我得到这些行:
"currency"#"£" GBP
"country"#"France" FR
这看起来像我所期望的。双引号很奇怪,但这并不妨碍我进一步。
数据被/应该存储在一个名为
ReferenceStore
.
吸收业务流
卡夫卡主题:
poc-raw-events
提供了示例Json数据的主题:
{“id”:“A”,“country”:“France”}
{“id”:“B”,“currency”:“货币”}
接收到的数据
KStream
:
final Consumed<String, JsonNode> consumed = Consumed.with(Serdes.String(), jsonSerde);
KStream<String, JsonNode> businessData = builder.stream("poc-raw-events", consumed);
从这里开始我不知道该怎么办。技术上,我知道如何更新JsonNode中的属性。所以我试着在
KStream公司
具有
foreach
,这边:
businessData.foreach(new ForeachAction<String, JsonNode>() {
public void apply(String k, JsonNode v) {
System.out.println(k+ " : " +v);
if (v==null) {System.out.println("NULL detected"); return;}
Iterator<Entry<String, JsonNode>> fields = v.fields();
int i=0;
while (fields.hasNext()) {
i++;
Entry<String, JsonNode> next = fields.next();
System.out.println(k+ " field #"+i+" : " +next.getKey() + " -- " + next.getValue());
String key = next.getKey() + "#" + next.getValue());
// ((ObjectNode) v).put(next.getKey(), " WHAT HERE ??? ");
}
}
});
我的想法是替换
" WHAT HERE ??? "
最后一行中的数据出现在引用KTable中。但是怎么做呢???
-
我没有找到像
.findByKey()
在KTable上。
-
我不知道如何进入
参考存储
本地商店,因为进入它的方式有点像
myKafkaStream.store(...)
此时此刻
myKafkaStream
还没有开始,甚至还没有建成。
另一种方法是使用KStream leftJoin KTable功能。但是我在某个地方读到(我没有书签…)要做到这一点,我们应该在两个K表中使用相同的键。但在我的例子中,在Json方面,我不处理要连接的键,而是处理一个简单的属性。
你将如何实现这一点?