我正在开发一个产品,它可以根据用户使用产品的方式添加/删除消费者组。
enable.auto.commit
在我们的产品中关闭,而是在每次收到数据后提交偏移量。
我们最近实施了一项服务,将暂停/恢复产品。卡夫卡图书馆
NodeJS
唯一的问题发生在添加新的消费者组时。首先,让我解释一下我看到的行为:
以下是消费者“group1”信息。。
$ bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group philz-topic-group1
TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
philz-topic 1 33 33 0 rdkafka-3ac4d56e-e94b-4365-9af7-04e485502b5d /10.233.113.109 rdkafka
philz-topic 4 34 34 0 rdkafka-d642805c-f5ea-4450-9cb0-3272fcbbffc9 /10.233.88.251 rdkafka
philz-topic 0 23 23 0 rdkafka-12cfca8b-fd61-4a68-bc5f-1946c8ef4eb1 /10.233.120.55 rdkafka
philz-topic 2 26 26 0 rdkafka-7561ca2a-9894-4a3d-83fe-d379bbe64fdf /10.233.126.40 rdkafka
philz-topic 3 20 20 0 rdkafka-cd9d5ed6-7daa-4b75-8f39-6704c8d887ed /10.233.119.133 rdkafka
这是消费者的“group2”信息。。消费者“group2”刚刚添加并完成了一个操作。因此,单个操作的当前偏移量和滞后量已更新。
$ bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group philz-topic-group2
TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
philz-topic 3 - 20 - rdkafka-b56306e1-b4b7-43fe-a604-ab7c12f70e9f /10.233.119.133 rdkafka
philz-topic 1 - 33 - rdkafka-76c9a4d2-268b-4ebb-94a8-f1230c9bbfea /10.233.113.109 rdkafka
philz-topic 4 34 34 0 rdkafka-d412e574-8241-48c6-af26-c50be44eb51d /10.233.126.40 rdkafka
philz-topic 0 - 23 - rdkafka-33179a7d-cb9f-453a-83c6-e7e4780372b6 /10.233.88.251 rdkafka
philz-topic 2 - 26 - rdkafka-77506e87-b666-4c92-82df-82071e2ff801 /10.233.120.55 rdkafka
我目前面临的问题是,当暂停/恢复操作发生时,消费组的所有分区都没有更新的当前偏移量和延迟,当取消订阅/暂停并完成操作时,一个分区现在应该有1的延迟。但是,如果一个新的使用者组对于给定的分区没有任何先前的当前偏移量和滞后量,那么该信息现在被跳过,使用者组也看不到。
我对卡夫卡不是很熟悉,所以这里的任何行为解释都是值得赞赏的。
我猜是因为我们的承诺抵消了我们自己
当一个操作发生时,我们能够看到新使用者组的一些信息,但是只能看到一个分区(刚刚接收到数据的分区)显示出来并用当前偏移量更新。
谢谢!
编辑: