同学们,今天好。
我有一个卡夫卡项目,用春季卡夫卡听一个明确的话题。
我一天需要听一次所有的信息,把它们放到一个集合中,在那里找到特定的信息。
我无法理解如何在一个@kafkalistener方法中读取所有消息。
我的班级是:
@Component
public class KafkaIntervalListener {
public CountDownLatch intervalLatch = new CountDownLatch(1);
private final SCDFRunnerService scdfRunnerService;
public KafkaIntervalListener(SCDFRunnerService scdfRunnerService) {
this.scdfRunnerService = scdfRunnerService;
}
@KafkaListener(topics = "${kafka.interval-topic}", containerFactory = "intervalEventKafkaListenerContainerFactory")
public void intervalListener(IntervalEvent event) throws UnsupportedEncodingException, JSONException {
System.out.println("Recieved interval message: " + event);
IntervalType type = event.getType();
Instant instant = event.getInterval();
List<IntervalEvent> events = new ArrayList<>();
events.add(event);
events.size();
this.intervalLatch.countDown();
}
}
我的活动集合的大小始终为1;
我尝试使用不同的循环,但是之后,我的收集被归档为同一条消息的530000倍。
更新:
我已经找到了一种使用factory.setBatchListener的方法(true);但我需要找到使用@scheduled启动它(cron=“$kafka.cron”,zone=“欧洲/莫斯科”)。现在这个方法总是在听。现在我尝试这样的方法:
@Scheduled(cron = "${kafka.cron}", zone = "Europe/Moscow")
public void run() throws Exception {
kafkaIntervalListener.intervalLatch.await();
}
它不起作用,在调试模式下,我的断点永远不会在这个站点上起作用。