代码之家  ›  专栏  ›  技术社区  ›  xmar

如何使用处理器api访问dsl创建的ktable/globalktable?

  •  2
  • xmar  · 技术社区  · 6 年前

    我使用的是处理器api(papi)拓扑。

    是否可以从处理器api中访问使用dsl创建的ktable(或globalktable)(即使是只读的)?

    即使用:

    val builder = new StreamsBuilder()
    val KTable = builder.table("topicname")
    

    我有一个ktable,但是拓扑结构只允许您使用 addStateStore 有一个仓库建造者,而不是KTable本身。

    .addStateStore(myStoreBuilder, MY_PROCESSOR_NAME)
    

    所以我可以这样做:

    def keyValueStoreBuilder[K, V](storeName: String, keySerde: Serde[K], valueSerde: Serde[V]): StoreBuilder[KeyValueStore[K, V]] = {
    Stores.keyValueStoreBuilder(
      Stores.persistentKeyValueStore(storeName),
      keySerde,
      valueSerde)
    

    }

    但是,如何干净地获得 storeName 在这种情况下?

    1 回复  |  直到 5 年前
        1
  •  2
  •   Jacek Laskowski    5 年前

    当您创建 KTable 它将自动在内部创建一个具有生成名称的存储。(你可以通过 Topology#describe() )您还可以通过 table() 方法应用 Materialized 参数。

    我不太清楚,你说的“访问处理器api中的ktable”是什么意思?如果您的意思是“访问 Processor “你可以用 Topology#connectProcessorAndStateStores() 让处理器访问存储。注意,处理器不应该写入ktable存储,因为 表() 操作员负责维护表的状态。如果您向存储中写入数据,则无法保证,并且在发生故障时可能会丢失数据。