代码之家  ›  专栏  ›  技术社区  ›  maxness

Spark streaming:错误StreamingContext:无法构造kafka使用者

  •  2
  • maxness  · 技术社区  · 6 年前

    我正在尝试使用spark流媒体访问卡夫卡主题。我不认为我遗漏了任何依赖项或导入,但当我尝试运行代码时,如下所示:

    public static void main(String[] args) {
    
        String URL = "spark://localhost:7077";
    
        SparkConf conf = new SparkConf().setAppName("Kafka-test").setMaster(URL);
        JavaStreamingContext ssc = new JavaStreamingContext(conf, Durations.seconds(1));
    
        Map<String, Object> kafkaParams = new HashMap<>();
        kafkaParams.put("bootstrap.servers", "localhost:6667");
        kafkaParams.put("key.deserializer", StringDeserializer.class);
        kafkaParams.put("value.deserializer", StringDeserializer.class);
        kafkaParams.put("group.id", "ID1");
        kafkaParams.put("auto.offset.reset", "latest");
        kafkaParams.put("enable.auto.commit", false);
    
        Collection<String> topics = Arrays.asList("MAX_LEGO", "CanBeDeleted");
    
        JavaInputDStream<ConsumerRecord<String, String>> stream = KafkaUtils.createDirectStream(ssc,
                LocationStrategies.PreferConsistent(),
                ConsumerStrategies.<String, String>Subscribe(topics, kafkaParams));
    
        JavaPairDStream<Object, Object> max = stream.mapToPair(record -> new Tuple2<>(record.key(), record.value()));
        max.count();
        max.print();
    
        ssc.start();
    
    }
    

    我收到一条错误消息:

    18/02/10 16:57:08 ERROR streaming.StreamingContext: Error starting the context, marking it as stopped org.apache.kafka.common.KafkaException: Failed to construct kafka consumer at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:703) at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:553) at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:536) at org.apache.spark.streaming.kafka010.Subscribe.onStart(ConsumerStrategy.scala:83) at org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.consumer(DirectKafkaInputDStream.scala:75) at org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.start(DirectKafkaInputDStream.scala:243) at org.apache.spark.streaming.DStreamGraph$$anonfun$start$5.apply(DStreamGraph.scala:49) at org.apache.spark.streaming.DStreamGraph$$anonfun$start$5.apply(DStreamGraph.scala:49) at scala.collection.parallel.mutable.ParArray$ParArrayIterator.foreach_quick(ParArray.scala:143) at scala.collection.parallel.mutable.ParArray$ParArrayIterator.foreach(ParArray.scala:136) at scala.collection.parallel.ParIterableLike$Foreach.leaf(ParIterableLike.scala:972) at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply$mcV$sp(Tasks.scala:49) at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:48) at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:48) at scala.collection.parallel.Task$class.tryLeaf(Tasks.scala:51) at scala.collection.parallel.ParIterableLike$Foreach.tryLeaf(ParIterableLike.scala:969) at scala.collection.parallel.AdaptiveWorkStealingTasks$WrappedTask$class.compute(Tasks.scala:152) at scala.collection.parallel.AdaptiveWorkStealingForkJoinTasks$WrappedTask.compute(Tasks.scala:443) at scala.concurrent.forkjoin.RecursiveAction.exec(RecursiveAction.java:160) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) at ... run in separate thread using org.apache.spark.util.ThreadUtils ... () at org.apache.spark.streaming.StreamingContext.liftedTree1$1(StreamingContext.scala:578) at org.apache.spark.streaming.StreamingContext.start(StreamingContext.scala:572) at org.apache.spark.streaming.api.java.JavaStreamingContext.start(JavaStreamingContext.scala:556) at org.kafkaConnection2.main(kafkaConnection2.java:50)

    Caused by: org.apache.kafka.common.KafkaException: com.fasterxml.jackson.databind.deser.std.StringDeserializer is not an instance of org.apache.kafka.common.serialization.Deserializer at org.apache.kafka.common.config.AbstractConfig.getConfiguredInstance(AbstractConfig.java:205) at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:624) at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:553) at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:536) at org.apache.spark.streaming.kafka010.Subscribe.onStart(ConsumerStrategy.scala:83) at org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.consumer(DirectKafkaInputDStream.scala:75) at org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.start(DirectKafkaInputDStream.scala:243) at org.apache.spark.streaming.DStreamGraph$$anonfun$start$5.apply(DStreamGraph.scala:49) at org.apache.spark.streaming.DStreamGraph$$anonfun$start$5.apply(DStreamGraph.scala:49) at scala.collection.parallel.mutable.ParArray$ParArrayIterator.foreach_quick(ParArray.scala:143) at scala.collection.parallel.mutable.ParArray$ParArrayIterator.foreach(ParArray.scala:136) at scala.collection.parallel.ParIterableLike$Foreach.leaf(ParIterableLike.scala:972) at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply$mcV$sp(Tasks.scala:49) at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:48) at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:48) at scala.collection.parallel.Task$class.tryLeaf(Tasks.scala:51) at scala.collection.parallel.ParIterableLike$Foreach.tryLeaf(ParIterableLike.scala:969) at scala.collection.parallel.AdaptiveWorkStealingTasks$WrappedTask$class.compute(Tasks.scala:152) at scala.collection.parallel.AdaptiveWorkStealingForkJoinTasks$WrappedTask.compute(Tasks.scala:443) at scala.concurrent.forkjoin.RecursiveAction.exec(RecursiveAction.java:160) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

    我的假设是,可能bootstrapserver设置错误,或者连接到kafka时出现了其他错误。。。

    请不要被这个可能相当愚蠢的问题所困扰,但我刚刚开始使用Spark和kafka

    1 回复  |  直到 6 年前
        1
  •  2
  •   Kiran Balakrishnan    6 年前

    尝试使用组织。阿帕奇。卡夫卡。常见的序列化。StringDeserializer而不是com。fasterxml。杰克逊。数据绑定。德塞。std.StringDeserializer,因为您得到以下异常

    原因:组织。阿帕奇。卡夫卡。常见的卡夫卡例外:com。fasterxml。杰克逊。数据绑定。德塞。std.StringDeserializer不是org的实例。阿帕奇。卡夫卡。常见的序列化。反序列化程序 位于组织。阿帕奇。卡夫卡。常见的配置。抽象配置。getConfiguredInstance(AbstractConfig.java:205) 位于组织。阿帕奇。卡夫卡。客户。消费者卡夫卡康萨默尔。(KafkaConsumer.java:624) 位于组织。阿帕奇。卡夫卡。客户。消费者卡夫卡康萨默尔。(KafkaConsumer.java:553) 位于组织。阿帕奇。卡夫卡。客户。消费者卡夫卡康萨默尔。(KafkaConsumer.java:536) 位于组织。阿帕奇。火花流动。kafka010.订阅。onStart(ConsumerStrategy.scala:83) 在