我有一个混合和匹配的Scala拓扑结构,其中主要工作是PAPI处理器,其他部分通过DSL连接。
EventsProcessor:
INPUT: eventsTopic
OUTPUT: visitorsTopic (and others)
整个主题的数据(包括原始数据
eventsTopic
)是通过一个分区的,我们称之为
DoubleKey
有两个字段。
访客被送到
visitorsTopic
通过水槽:
.addSink(VISITOR_SINK_NAME, visitorTopicName,
DoubleKey.getSerializer(), Visitor.getSerializer(), visitorSinkPartitioner, EVENT_PROCESSOR_NAME)
在DSL中,我在这个主题上创建了一个KV KTable:
val visitorTable = builder.table(
visitorTopicName,
Consumed.`with`(DoubleKey.getKafkaSerde(),
Visitor.getKafkaSerde()),
Materialized.as(visitorStoreName))
我后来连接到
EventProcessor
:
topology.connectProcessorAndStateStores(EVENT_PROCESSOR_NAME, visitorStoreName)
所有的东西都是共分的(通过DoubleKey)。
visitorSinkPartitioner
执行典型的模运算:
Math.abs(partitionKey.hashCode % numPartitions)
在PAPI处理器EvestsPalm中,我查询这个表,看看是否已经存在访问者。
但是,在我的测试中(使用
EmbeddedKafka
,但这不应该有什么区别),如果我用一个分区运行它们,一切都很好(EventsProcessor检查同一个分区上两个事件的KTable
双键
在第二个事件中——有一些延迟——它可以看到存在。
Visitor
但如果我用更高的数字运行它,EventProcessor将永远看不到存储中的值。
但是,如果我通过API检查存储(迭代
store.all()
),记录就在那里。所以我知道它一定要去不同的分区。
由于KTable应该在其分区上的数据上工作,并且所有内容都被发送到同一分区(使用显式分区器调用同一代码),KTable应该在同一分区上获取该数据。
我的假设正确吗?会发生什么?
KafkaStreams 1.0.0和Scala 2.12.4。
当然,做
put
在PAPI上通过PAPI而不是
StreamsBuilder.table()
,因为这肯定会使用运行代码的相同分区,但这是不可能的。