同学们,今天好。 我有一个卡夫卡项目,用春季卡夫卡听一个明确的话题。 我一天需要听一次所有的信息,把它们放到一个集合中,在那里找到特定的信息。 我无法理解如何在一个@kafkalistener方法中读取所有消息。 我的班级是:

@Component
public class KafkaIntervalListener {

    public CountDownLatch intervalLatch = new CountDownLatch(1);
    private final SCDFRunnerService scdfRunnerService;

    public KafkaIntervalListener(SCDFRunnerService scdfRunnerService) {
        this.scdfRunnerService = scdfRunnerService;
    }

    @KafkaListener(topics = "${kafka.interval-topic}", containerFactory = "intervalEventKafkaListenerContainerFactory")
    public void intervalListener(IntervalEvent event) throws UnsupportedEncodingException, JSONException {
        System.out.println("Recieved interval message: " + event);
        IntervalType type = event.getType();
        Instant instant = event.getInterval();
        List<IntervalEvent> events = new ArrayList<>();
        events.add(event);
        events.size();


        this.intervalLatch.countDown();
    }

}

我的活动集合的大小始终为1; 我尝试使用不同的循环,但是之后,我的收集被归档为同一条消息的530000倍。

更新: 我已经找到了一种使用factory.setBatchListener的方法(true);但我需要找到使用@scheduled启动它(cron=“$kafka.cron”,zone=“欧洲/莫斯科”)。现在这个方法总是在听。现在我尝试这样的方法:

    @Scheduled(cron = "${kafka.cron}", zone = "Europe/Moscow")
public void run() throws Exception {
    kafkaIntervalListener.intervalLatch.await();
}

它不起作用,在调试模式下,我的断点永远不会在这个站点上起作用。

1 回复  |  直到 6 年前
    1
  •  2
  •   Gary Russell    6 年前

    Consumer poll()