代码之家  ›  专栏  ›  技术社区  ›  Ryuzaki L

如何获取卡夫卡消费者id进行日志记录

  •  1
  • Ryuzaki L  · 技术社区  · 6 年前

    在我的应用程序中,我使用 spring-kafka 使用来自kafka服务器的消息,但从控制台消费者获得 consumer-id 处于活动状态的所有使用者线程的

    TOPIC            PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID                                    HOST            CLIENT-ID
    easytest-events    9          247367          247367          0             p3-S14-0-e6a1d3cb-8ab3-435f-9f53-5081a6e8f812 /10.66.56.129   p3-S14-0
    

    有没有办法 消费者id 通过代码,我可以比较它们

    2 回复  |  直到 6 年前
        1
  •  3
  •   Gary Russell    6 年前

    消费者id似乎是附加了UUID的客户机id,因此您可以只使用客户机id(可以设置为任何您想要的)。Spring将添加-0、-1等。

    您可以在分配分区时看到日志中的线程数。。。

    2018-08-31 09:34:27.869  INFO 55748 --- [o52105744-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : partitions assigned: [so52105744-0]
    2018-08-31 09:34:27.876  INFO 55748 --- [o52105744-2-C-1] o.s.k.l.KafkaMessageListenerContainer    : partitions assigned: [so52105744-3]
    2018-08-31 09:34:27.876  INFO 55748 --- [o52105744-1-C-1] o.s.k.l.KafkaMessageListenerContainer    : partitions assigned: [so52105744-2]
    2018-08-31 09:34:27.876  INFO 55748 --- [o52105744-9-C-1] o.s.k.l.KafkaMessageListenerContainer    : partitions assigned: [so52105744-1]
    2018-08-31 09:34:27.876  INFO 55748 --- [o52105744-3-C-1] o.s.k.l.KafkaMessageListenerContainer    : partitions assigned: [so52105744-4]
    2018-08-31 09:34:27.876  INFO 55748 --- [o52105744-6-C-1] o.s.k.l.KafkaMessageListenerContainer    : partitions assigned: [so52105744-7]
    2018-08-31 09:34:27.876  INFO 55748 --- [o52105744-5-C-1] o.s.k.l.KafkaMessageListenerContainer    : partitions assigned: [so52105744-6]
    2018-08-31 09:34:27.876  INFO 55748 --- [o52105744-4-C-1] o.s.k.l.KafkaMessageListenerContainer    : partitions assigned: [so52105744-5]
    2018-08-31 09:34:27.877  INFO 55748 --- [o52105744-7-C-1] o.s.k.l.KafkaMessageListenerContainer    : partitions assigned: [so52105744-8]
    2018-08-31 09:34:27.877  INFO 55748 --- [o52105744-8-C-1] o.s.k.l.KafkaMessageListenerContainer    : partitions assigned: [so52105744-9]
    
        2
  •  2
  •   Rahul Gupta    5 年前

    我还有一个类似的用例,我想获取用于登录的消费者Id,所以我使用了 当前线程名称 而且似乎效果不错。

    LOGGER.info("Current thread: {}",Thread.currentThread().getName());
    

    这张照片显示:-

    Current thread: org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1
    Current thread: org.springframework.kafka.KafkaListenerEndpointContainer#0-2-C-1
    Current thread: org.springframework.kafka.KafkaListenerEndpointContainer#0-1-C-1
    

    这也清楚地表明,消息每次都被不同的线程使用(可能是循环方式)