代码之家  ›  专栏  ›  技术社区  ›  Stella

kafka流中的处理器节点

  •  -1
  • Stella  · 技术社区  · 6 年前

    我正在努力 processor node 在卡夫卡溪流中。对于一个简单的代码,我编写如下代码只是为了过滤 UserID ,这是正确的做法吗 处理器节点 在卡夫卡河?

    但是,以下代码未编译,引发错误: The method filter(Predicate<? super Object,? super Object>) in the type KStream<Object,Object> is not applicable for the arguments (new Predicate<String,String>(){})

    KStreamBuilder builder = new KStreamBuilder();
    
    builder.stream(topic)
        .filter(new Predicate <String, String>() {
            //@Override
            public boolean test(String key, String value) {
                Hashtable<Object, Object> message;
                // put you processor logic here
                return message.get("UserID").equals("1");
            }
        })
        .to(streamouttopic);
    
        final KafkaStreams streams = new KafkaStreams(builder, props);
        final CountDownLatch latch = new CountDownLatch(1);
    
        // attach shutdown handler to catch control-c
        Runtime.getRuntime().addShutdownHook(new Thread("streams-shutdown-hook") {
        @Override
        public void run() {
           streams.close();
           latch.countDown();
        }
    });
    
    try {
        streams.start();
        latch.await();
    } catch (Throwable e) {
        System.exit(1);
    }
    System.exit(0);
    

    有人能指引我吗?

    2 回复  |  直到 6 年前
        1
  •  2
  •   Matthias J. Sax    6 年前

    builder.stream(topic) 退货 KStream<Object,Object> 类型,因为您没有指定泛型类型。和 <Object,Object> 与不兼容 <String,String>

    如果你知道,实际的类型是 KStream<String,String> 可以按以下方式指定类型:

    builder.<Sting,String>stream(topic)
           .filter(...)
    

    回答有关“处理器节点”的问题:是,添加 filter() 将在内部添加处理器节点。请注意,在DSL级别,您通常不需要考虑处理器。

    如果要显式使用处理器,可以使用处理器API而不是DSL。查看WordCount示例: https://github.com/apache/kafka/blob/trunk/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorDemo.java

    请注意,使用DSL,代码将在内部转换为处理器拓扑,即Kafka流的运行时模型。

        2
  •  -1
  •   Katya Gorshkova    6 年前

    可能您正在使用 Predicate 从另一个包初始化。您需要使用

    import org.apache.kafka.streams.kstream.Predicate;