代码之家  ›  专栏  ›  技术社区  ›  Bankelaal

Spark仅从一个卡夫卡主题读取数据流

  •  0
  • Bankelaal  · 技术社区  · 6 年前

    我有一个spark流媒体应用程序,我必须订阅多个主题来读取和处理数据。现在为了测试,我添加了两个主题。 但我认为下面的代码只从一个主题读取数据。我尝试过从自定义代码发送数据,也尝试过通过卡夫卡消费者批处理文件发送数据。如果我为每个主题创建多个直接流,那么我就能够阅读所有主题。但是我有1000个主题,所以我只想创建一个直接流并提供一个主题集。有人知道我可能做错了什么吗?

    Collection<String> topicList = new ArrayList<>();   
    topicList.add("A");
    topicList.add("B");
    final JavaInputDStream<ConsumerRecord<String, String>> stream = KafkaUtils.createDirectStream(
                        streamingContext,
                        LocationStrategies.PreferConsistent(),
                        ConsumerStrategies.<
                        String, String> Subscribe(topicList, kafkaParams));
    
    for(String topic:topicList)
    {
        JavaDStream<ConsumerRecord<String, String>> filteredStreams = stream.filter(msg -> {
            return msg.topic().equals(topic);
        });
    
        JavaPairDStream<String, String> kafkaMessages = filteredStreams.mapToPair(new KafkaConsumerFunction());
        process(kafkaMessages );
    }
    
    0 回复  |  直到 6 年前