代码之家  ›  专栏  ›  技术社区  ›  Punter Vicky

春季卡夫卡消费者-打印卡夫卡滞后信息

  •  1
  • Punter Vicky  · 技术社区  · 7 年前

    我创建了一个spring kafka消费者,它从一个主题中阅读。有没有类似于打印分区信息的方式来打印延迟信息?

    2 回复  |  直到 7 年前
        1
  •  5
  •   Felipe Dias    5 年前

    虽然没有提供源代码,但我假设您通过 注释。 我已经克服了您描述的使用 组织。阿帕奇。卡夫卡。客户。消费者消费者 here . 它可以在 注释。 指标() 方法,其中包含存储在 记录最大滞后 所有物

    private static final Logger LOGGER = LoggerFactory.getLogger(YourClass.class);
    
    @KafkaListener(topics = "your-topic", groupId = "your-group-id", id = "your-client-id", containerFactory = "kafkaListenerContainerFactory")
    public void listenerExample(List<String> msgs, @Header(KafkaHeaders.OFFSET) List<Long> offsets, Acknowledgment ack,
            Consumer<?, ?> consumer) {
    
    
    
        String lag = consumer.metrics().values().stream().filter(m -> "records-lag-max".equals(m.metricName().name()))
                .map(Metric::metricValue).map(Object::toString).distinct()
                .collect(Collectors.joining("", "[Kafka current consumer lag]", " records"));
    
    
        LOGGER.info(lag);
    
    
    }
    

    在这种情况下,我明确选择了 记录滞后最大值 Confluent Docs .

    上述代码段将具有以下输出: [Kafka current consumer lag] X records 其中X是该窗口中任何分区的记录数的最大延迟。

    我正在使用版本 2.3.3.释放 图书馆

    <dependency>
        <groupId>org.springframework.kafka</groupId>
        <artifactId>spring-kafka</artifactId>
        <version>2.3.3.RELEASE</version>
    </dependency>
    
        2
  •  2
  •   Gary Russell    7 年前

    有一个命令行工具。。。

    $ kafka-consumer-groups --bootstrap-server localhost:9092 --describe --group myGroup
    
    TOPIC                          PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG        CONSUMER-ID                                       HOST                           CLIENT-ID
    myTopic                        0          66              66              0          -                                           
    

    Process process = new ProcessBuilder()
            .command("/usr/local/bin/kafka-consumer-groups", "--bootstrap-server", "localhost:9092",
                    "--describe", "--group", "siTestGroup")
            .start();
    InputStream inputStream = process.getInputStream();
    process.waitFor(10, TimeUnit.SECONDS);
    ByteArrayOutputStream baos = new ByteArrayOutputStream();
    FileCopyUtils.copy(inputStream, baos);
    System.out.println(new String(baos.toByteArray()));