代码之家  ›  专栏  ›  技术社区  ›  kellanburket

如何流到全局kafka表

  •  3
  • kellanburket  · 技术社区  · 6 年前

    我有一个Kafka Streams应用程序,它需要针对全局表加入一个传入流,然后经过一些处理,将聚合的结果写回该表:

    KeyValueBytesStoreSupplier supplier = Stores.persistentKeyValueStore(
        storeName
    );
    
    Materialized<String, String, KeyValueStore<Bytes, byte[]>> m = Materialized.as(
        supplier
    );
    
    GlobalKTable<String, String> table = builder.globalTable(
        topic, m.withKeySerde(
            Serdes.String()
        ).withValueSerde(
            Serdes.String()
        )
    );
    
    stream.leftJoin(
        table
        ...
    ).groupByKey().aggregate(
        ... 
    ).toStream().through(
        topic, Produced.with(Serdes.String(), Serdes.String())
    );
    

    但是,当我尝试流入ktable changelog时,我得到以下错误: Invalid topology: Topic 'topic' has already been registered by another source.

    如果我尝试聚合到存储本身,我会得到以下错误: InvalidStateStoreException: Store 'store' is currently closed .

    如何对表进行联接并将其写回变更日志?

    如果这是不可能的,一个涉及根据存储筛选传入日志的解决方案也可以工作。

    1 回复  |  直到 6 年前
        1
  •  4
  •   Matthias J. Sax    6 年前

    打电话 through() 是的快捷方式

    stream.to("topic");
    KStream stream2 = builder.stream("topic");
    

    因为你用 builder.stream("topic") 你已经知道了 Invalid topology: Topic 'topic' has already been registered by another source. 因为每个主题只能使用一次。如果要将流/主题的数据馈送到不同的部分,则需要重用原始数据 KStream 您为此主题创建了:

    kstream stream=builder.stream(“topic”);

    // this won't work
    KStream stream2 = stream.through("topic");
    
    // rewrite to
    stream.to("topic");
    KStream stream2 = stream; // or just omit `stream2` and reuse `stream`
    

    不知道你说的是什么意思

    如果我试图聚集到商店本身