代码之家  ›  专栏  ›  技术社区  ›  Val Bonn

如何使用KTable作为引用数据来更新KStream?

  •  0
  • Val Bonn  · 技术社区  · 6 年前

    我有一个Kafka主题,其中包含Json格式的数据:

    {"id": "A", "country": "France"}
    {"id": "B", "currency": "£"}
    

    我想用“引用表”之类的东西规范化内容:

    country ( "France" ) -> "FR"
    currency ( "£" ) -> "GBP"
    

    为了输出:

    {"id": "A", "country": "FR"}
    {"id": "B", "currency": "GBP"}
    

    我认为这是使用 KTable 以存储引用数据。但我在执行上有点拘泥。

    当前状态

    摄取参考数据

    卡夫卡专题: poc-mapping-in

    提供了示例Json数据的主题:

    {"mapping":"ccy",     "from":"£",      "to":"GBP"}
    {"mapping":"country", "from":"France", "to":"FR"}
    

    接收到的数据 K表 对键和值进行返工后:

             KStream<String, String> mappingStream = builder
                    .stream("poc-mapping-in",consumed)
                    .map(
                         (key, value) -> KeyValue.pair(
                             value.get("mapping")+"#"+value.get("from"), 
                             value.get("to").asText())
             );
    
             KGroupedStream<String, String> mappingGroupedStream = mappingStream.groupByKey(
                     Serialized.with(Serdes.String(),Serdes.String() ));
    
    
             KTable<String,String> mappingTable = mappingGroupedStream.aggregate(
                    () -> "", //initializer 
                    (aggKey, newValue, aggValue) -> newValue, // adder 
                    Materialized.<String, String, KeyValueStore<Bytes, byte[]>>as("ReferenceStore")
                        .withValueSerde(Serdes.String())
                        .withKeySerde(Serdes.String())
                    );
    
               // Testing
               mappingTable.toStream().to("poc-mapping-in-content", 
                    Produced.with(Serdes.String(), Serdes.String()));
    

    在这个话题上 poc-mapping-in-content ,我得到这些行:

    "currency"#"£"      GBP
    "country"#"France"  FR
    

    这看起来像我所期望的。双引号很奇怪,但这并不妨碍我进一步。

    数据被/应该存储在一个名为 ReferenceStore .

    吸收业务流

    卡夫卡主题: poc-raw-events

    提供了示例Json数据的主题:

    {“id”:“A”,“country”:“France”}
    {“id”:“B”,“currency”:“货币”}
    

    接收到的数据 KStream :

      final Consumed<String, JsonNode> consumed = Consumed.with(Serdes.String(), jsonSerde);
      KStream<String, JsonNode> businessData = builder.stream("poc-raw-events", consumed);
    

    从这里开始我不知道该怎么办。技术上,我知道如何更新JsonNode中的属性。所以我试着在 KStream公司 具有 foreach ,这边:

        businessData.foreach(new ForeachAction<String, JsonNode>()  {
            public void apply(String k, JsonNode v) {
                System.out.println(k+ " : " +v);
                        if (v==null) {System.out.println("NULL detected"); return;}
                Iterator<Entry<String, JsonNode>> fields = v.fields();
                int i=0;
                while (fields.hasNext()) {
                    i++;
                    Entry<String, JsonNode> next = fields.next();
                    System.out.println(k+ " field #"+i+" : " +next.getKey() + " -- " + next.getValue());
    
                    String key = next.getKey() + "#" + next.getValue());
    //              ((ObjectNode) v).put(next.getKey(), "  WHAT HERE ??? ");
    
                }
    
            }
        });
    

    我的想法是替换 " WHAT HERE ??? " 最后一行中的数据出现在引用KTable中。但是怎么做呢???

    • 我没有找到像 .findByKey() 在KTable上。
    • 我不知道如何进入 参考存储 本地商店,因为进入它的方式有点像 myKafkaStream.store(...) 此时此刻 myKafkaStream 还没有开始,甚至还没有建成。

    另一种方法是使用KStream leftJoin KTable功能。但是我在某个地方读到(我没有书签…)要做到这一点,我们应该在两个K表中使用相同的键。但在我的例子中,在Json方面,我不处理要连接的键,而是处理一个简单的属性。

    你将如何实现这一点?

    2 回复  |  直到 6 年前
        1
  •  4
  •   bbejeck    6 年前

    既然你用的是参考数据,我想你要考虑的是 GlobalKTable . 一个 球形 根据完全复制 KafkaStreams 实例和是显式创建的,用于保存上述用例的引用数据。

    KStream GlobalKTable联接的独特之处在于,您可以使用 KeyValue 要映射到 球形 . 所以只要你能从 JsonNode ,您应该能够加入到 GlobalKTable.

        2
  •  0
  •   charlb    6 年前

    如果referenceKTable的键与data.getAltKey()匹配

    streamToMap.selectKey((originalKey, data) -> data.getAltKey()).leftJoin(referenceKTable, valueJoiner)
    

    可以做到。valueJoiner(或lambda)的实现必须组合这两个输入。