我有一个spark流媒体应用程序,我必须订阅多个主题来读取和处理数据。现在为了测试,我添加了两个主题。
但我认为下面的代码只从一个主题读取数据。我尝试过从自定义代码发送数据,也尝试过通过卡夫卡消费者批处理文件发送数据。如果我为每个主题创建多个直接流,那么我就能够阅读所有主题。但是我有1000个主题,所以我只想创建一个直接流并提供一个主题集。有人知道我可能做错了什么吗?
Collection<String> topicList = new ArrayList<>();
topicList.add("A");
topicList.add("B");
final JavaInputDStream<ConsumerRecord<String, String>> stream = KafkaUtils.createDirectStream(
streamingContext,
LocationStrategies.PreferConsistent(),
ConsumerStrategies.<
String, String> Subscribe(topicList, kafkaParams));
for(String topic:topicList)
{
JavaDStream<ConsumerRecord<String, String>> filteredStreams = stream.filter(msg -> {
return msg.topic().equals(topic);
});
JavaPairDStream<String, String> kafkaMessages = filteredStreams.mapToPair(new KafkaConsumerFunction());
process(kafkaMessages );
}