代码之家  ›  专栏  ›  技术社区  ›  xmar

卡夫卡流:混合和匹配PAPI和DSL KTable不可共分

  •  0
  • xmar  · 技术社区  · 6 年前

    我有一个混合和匹配的Scala拓扑结构,其中主要工作是PAPI处理器,其他部分通过DSL连接。

    EventsProcessor: 
    INPUT: eventsTopic
    OUTPUT: visitorsTopic (and others)
    

    整个主题的数据(包括原始数据 eventsTopic )是通过一个分区的,我们称之为 DoubleKey 有两个字段。 访客被送到 visitorsTopic 通过水槽:

    .addSink(VISITOR_SINK_NAME, visitorTopicName,
        DoubleKey.getSerializer(), Visitor.getSerializer(), visitorSinkPartitioner, EVENT_PROCESSOR_NAME)
    

    在DSL中,我在这个主题上创建了一个KV KTable:

    val visitorTable = builder.table(
      visitorTopicName,
      Consumed.`with`(DoubleKey.getKafkaSerde(),
      Visitor.getKafkaSerde()),
      Materialized.as(visitorStoreName))
    

    我后来连接到 EventProcessor :

    topology.connectProcessorAndStateStores(EVENT_PROCESSOR_NAME, visitorStoreName)
    

    所有的东西都是共分的(通过DoubleKey)。 visitorSinkPartitioner 执行典型的模运算:

    Math.abs(partitionKey.hashCode % numPartitions)
    

    在PAPI处理器EvestsPalm中,我查询这个表,看看是否已经存在访问者。

    但是,在我的测试中(使用 EmbeddedKafka ,但这不应该有什么区别),如果我用一个分区运行它们,一切都很好(EventsProcessor检查同一个分区上两个事件的KTable 双键 在第二个事件中——有一些延迟——它可以看到存在。 Visitor 但如果我用更高的数字运行它,EventProcessor将永远看不到存储中的值。

    但是,如果我通过API检查存储(迭代 store.all() ),记录就在那里。所以我知道它一定要去不同的分区。

    由于KTable应该在其分区上的数据上工作,并且所有内容都被发送到同一分区(使用显式分区器调用同一代码),KTable应该在同一分区上获取该数据。

    我的假设正确吗?会发生什么?

    KafkaStreams 1.0.0和Scala 2.12.4。

    当然,做 put 在PAPI上通过PAPI而不是 StreamsBuilder.table() ,因为这肯定会使用运行代码的相同分区,但这是不可能的。

    1 回复  |  直到 6 年前
        1
  •  1
  •   xmar    6 年前

    是的,假设是正确的。

    如果它能帮助任何人:

    将分区程序传递到Scala EmbeddedKafka库时遇到问题。在其中一个测试套件中,它做得不对。 现在,遵循重构的健康实践,我在这个拓扑的所有套件中都使用了这个方法。

    def getEmbeddedKafkaTestConfig(zkPort: Int, kafkaPort: Int) : 
        EmbeddedKafkaConfig = {
        val producerProperties = Map(ProducerConfig.PARTITIONER_CLASS_CONFIG ->
            classOf[DoubleKeyPartitioner].getCanonicalName)
        EmbeddedKafkaConfig(kafkaPort = kafkaPort, zooKeeperPort = zkPort, 
            customProducerProperties = producerProperties)
    }