代码之家  ›  专栏  ›  技术社区  ›  JJ15k

卡夫卡分区分配协议

  •  1
  • JJ15k  · 技术社区  · 6 年前

    我正在编写集成测试,以使用confluent dotnet(包装librdkafka)验证卡夫卡生产者-消费者配置。

    在一个测试中,我想启动一个消费者,该消费者将从现有分区的末尾开始,然后发布一条来自生产者的消息,并断言消费者只使用了一条消息。

    现在,使用者的启动是异步的(即:如果您调用subscribe然后立即发布,则从末尾开始的使用者将不会收到它)。 在没有竞争条件的情况下,编写此测试的适当方法是什么?完成“分区.分配”后,是否已确定使用者偏移量?我不确定,因为OnPartitionAssigned的回调只包含TopicPartition,没有偏移量。

    在一个相关的问题上,似乎有时,在没有任何kafka节点故障的情况下(afaict),我收到的分配的分区比分配的分区多(即:我两次分配相同的分区),这怎么可能?

    1 回复  |  直到 6 年前
        1
  •  0
  •   Edenhill    6 年前

    设置OnPartitionEof委托,当使用者到达分区的末尾时将调用该委托,当调用该委托时,您可以确保使用者确实正在获取给定分区的消息,并且可以开始向其生成消息。

            consumer.OnPartitionEOF += (_, end)
                => Console.WriteLine($"Reached end of topic {end.Topic} partition {end.Partition}, next message will be at offset {end.Offset}");