代码之家  ›  专栏  ›  技术社区  ›  phaigeim

为storm consumer检查卡夫卡主题的偏移量

  •  0
  • phaigeim  · 技术社区  · 6 年前

    我正在使用storm kafka client 1.2.1,并为KafkaTridentSpoutOpaque创建喷口配置,如下所示

                kafkaSpoutConfig = KafkaSpoutConfig.builder(brokerURL, kafkaTopic)
                    .setProp(ConsumerConfig.GROUP_ID_CONFIG,"storm-kafka-group")
                    .setProcessingGuarantee(ProcessingGuarantee.AT_MOST_ONCE)
                    .setProp(ConsumerConfig.CLIENT_ID_CONFIG,InetAddress.getLocalHost().getHostName())
    

    我在卡夫卡和Zookeeper中都找不到我的组id和偏移量。通过Zookeeper,我尝试了zkCli。sh和尝试 ls /consumers 但并没有,因为我认为卡夫卡自己现在正在维持偏移量,而不是动物园管理员。

    我也用下面的命令尝试过卡夫卡

    bin/kafka-run-class.sh kafka.admin.ConsumerGroupCommand  --list  --bootstrap-server localhost:9092
    Note: This will not show information about old Zookeeper-based consumers.
    console-consumer-20130
    console-consumer-82696
    console-consumer-6106
    console-consumer-67393
    console-consumer-14333
    console-consumer-21174
    console-consumer-64550
    

    有人能帮我找到偏移量吗?如果我重新启动拓扑,它会再次重播卡夫卡中的事件吗?

    1 回复  |  直到 6 年前
        1
  •  1
  •   Stig Rohde Døssing    6 年前

    三叉戟在卡夫卡中不存储偏移量,但在斯托姆的动物园管理员中存储偏移量。如果使用Storm的Zookeeper配置的默认设置运行,则Storm的Zookeeper中的路径如下 /coordinator/<your-topology-id>/meta .

    该路径下的对象将包含第一个和最后一个偏移量,以及每个批的主题分区。因此,例如。 /coordinator/<your-topology-id>/meta/15 将包含批次号15中发出的第一个和最后一个偏移量。

    重新启动后喷口是否重放偏移量由 FirstPollOffsetStrategy 您在 KafkaSpoutConfig 。默认值为 UNCOMMITTED_EARLIEST ,重新启动时不会重新启动。查看Javadoc https://github.com/apache/storm/blob/v1.2.1/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java#L126 .