我使用的是处理器api(papi)拓扑。
是否可以从处理器api中访问使用dsl创建的ktable(或globalktable)(即使是只读的)?
即使用:
val builder = new StreamsBuilder()
val KTable = builder.table("topicname")
我有一个ktable,但是拓扑结构只允许您使用
addStateStore
有一个仓库建造者,而不是KTable本身。
.addStateStore(myStoreBuilder, MY_PROCESSOR_NAME)
所以我可以这样做:
def keyValueStoreBuilder[K, V](storeName: String, keySerde: Serde[K], valueSerde: Serde[V]): StoreBuilder[KeyValueStore[K, V]] = {
Stores.keyValueStoreBuilder(
Stores.persistentKeyValueStore(storeName),
keySerde,
valueSerde)
}
但是,如何干净地获得
storeName
在这种情况下?