代码之家  ›  专栏  ›  技术社区  ›  BnJ

如何知道卡夫卡何时提交记录?

  •  0
  • BnJ  · 技术社区  · 6 年前

    在集成测试的情况下,我将一条记录发送到Kafka,我想知道它将在何时被处理和提交,然后进行断言(而不是使用 Thread.sleep )。。。

    以下是我的尝试:

    public void sendRecordAndWaitUntilItsNotConsumed(ProducerRecord<String, String> record)
          throws ExecutionException, InterruptedException {
    
        RecordMetadata recordMetadata = producer.send(record).get();
        TopicPartition topicPartition = new TopicPartition(recordMetadata.topic(),
            recordMetadata.partition());
    
        try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerConfig)) {
    
          consumer.assign(Collections.singletonList(topicPartition));
    
          long recordOffset = recordMetadata.offset();
          long currentOffset = getCurrentOffset(consumer, topicPartition);
    
          while (currentOffset <= recordOffset) {
            currentOffset = getCurrentOffset(consumer, topicPartition);
            LOGGER.info("Waiting for message to be consumed - Current Offset = " + currentOffset
                + " - Record Offset = " + recordOffset);
          }
        }
      }
    
      private long getCurrentOffset(KafkaConsumer<String, String> consumer,
          TopicPartition topicPartition) {
    
        consumer.seekToEnd(Collections.emptyList());
    
        return consumer.position(topicPartition);
      }
    

    但它不起作用。实际上,我禁用了消费者的提交,它不会循环 Waiting for message to be consumed...

    1 回复  |  直到 6 年前
        1
  •  0
  •   BnJ    6 年前

    它使用 KafkaConsumer#committed 而不是 KafkaConsumer#position

    private void sendRecordAndWaitUntilItsNotConsumed(ProducerRecord<String, String> record) throws InterruptedException, ExecutionException {
    
            RecordMetadata recordMetadata = producer.send(record).get();
    
            TopicPartition topicPartition = new TopicPartition(recordMetadata.topic(),
                    recordMetadata.partition());
    
            consumer.assign(Collections.singletonList(topicPartition));
    
            long recordOffset = recordMetadata.offset();
            long currentOffset = getCurrentOffset(topicPartition);
    
            while (currentOffset < recordOffset) {
                currentOffset = getCurrentOffset(topicPartition);
                LOGGER.info("Waiting for message to be consumed - Current Offset = " + currentOffset
                        + " - Record Offset = " + recordOffset);
                TimeUnit.MILLISECONDS.sleep(200);
            }
        }
    
        private long getCurrentOffset(TopicPartition topicPartition) {
            OffsetAndMetadata offsetAndMetadata = consumer.committed(topicPartition);
            return offsetAndMetadata != null ? offsetAndMetadata.offset() - 1 : -1;
        }