代码之家  ›  专栏  ›  技术社区  ›  skrshn

获取错误“java.lang.nosuchmethoderror:org.apache.kafka.clients.consumer.kafkaconsumer.assign”when tring to consume using flink的kafka consumer

  •  2
  • skrshn  · 技术社区  · 6 年前

    我试图写一个卡夫卡消费者,它使用一个主题的数据。但是每当我尝试运行它时,都会得到以下错误。

    Exception in thread "main" org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
    at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply$mcV$sp(JobManager.scala:897)
    at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:840)
    at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:840)
    at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
    at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
    at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:39)
    at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:415)
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
    Caused by: java.lang.NoSuchMethodError: org.apache.kafka.clients.consumer.KafkaConsumer.assign(Ljava/util/List;)V
    at org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerCallBridge.assignPartitions(KafkaConsumerCallBridge.java:39)
    at org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread.reassignPartitions(KafkaConsumerThread.java:391)
    at org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread.run(KafkaConsumerThread.java:229)
    

    Java类是:

    import org.apache.flink.api.java.utils.ParameterTool;
    import org.apache.flink.streaming.api.datastream.DataStream;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09;
    import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
    
    
    public class KafkaConsumer {
    public static void main(String[] args) throws Exception{
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        ParameterTool parameterTool = ParameterTool.fromArgs(args);
    
        DataStream<String> stream = env.addSource(new FlinkKafkaConsumer09<String>("rdf-new", new SimpleStringSchema(), parameterTool.getProperties()));
    
        stream.print();
        env.execute();
    }}
    

    我用同样的代码在intellij中创建了一个独立的项目(有它自己的pom),它运行得很好,但是由于我需要另一个项目中的代码,我在已经存在的项目中创建了一个新的maven模块,然后尝试运行它,现在它显示了这个错误。

    maven模块的pom.xml中的依赖项是:

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <flink.version>1.4.2</flink.version>
        <java.version>1.8</java.version>
        <scala.binary.version>2.11</scala.binary.version>
        <maven.compiler.source>${java.version}</maven.compiler.source>
        <maven.compiler.target>${java.version}</maven.compiler.target>
    </properties>
    
    <dependencies>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_2.11</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_2.11</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-core</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka-0.9_2.11</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-log4j12</artifactId>
            <version>1.7.7</version>
            <scope>runtime</scope>
        </dependency>
        <dependency>
            <groupId>log4j</groupId>
            <artifactId>log4j</artifactId>
            <version>1.2.17</version>
            <scope>runtime</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-cep_2.11</artifactId>
            <version>${flink.version}</version>
        </dependency>
    </dependencies>
    

    我注意到的唯一一件事是在Maven模块中,我得到了Kafkaveration 1.1.0,但是POM有KafkaConnector“Flink-Connector-Kafka-0.9_2.11”

    2018-05-18 11:14:56,105 - AbstractConfig                           [WARN] - ConsumerConfig            - The configuration 'zookeeper.connect' was supplied but isn't a known config. 
    2018-05-18 11:14:56,105 - AppInfoParser$AppInfo                    [INFO] - AppInfoParser             - Kafka version : 1.1.0                              
    2018-05-18 11:14:56,105 - AppInfoParser$AppInfo                    [INFO] - AppInfoParser             - Kafka commitId : fdcf75ea326b8e07  
    

    而在独立项目中(消费者正常工作的地方),Kafka版本是0.9.0.1。

    11:32:19,537 WARN  org.apache.kafka.clients.consumer.ConsumerConfig              - The configuration zookeeper.connect = localhost:2181 was supplied but isn't a known config.
    11:32:19,537 INFO  org.apache.kafka.common.utils.AppInfoParser                   - Kafka version : 0.9.0.1
    11:32:19,538 INFO  org.apache.kafka.common.utils.AppInfoParser                   - Kafka commitId : 23c69d62a0cabf06
    

    如果有人能告诉我问题可能是什么,这将是一个巨大的帮助?这可能是因为pom文件中的依赖关系,但在独立项目中,它也与我在中给出的依赖关系相同。 事先谢谢。

    1 回复  |  直到 6 年前
        1
  •  2
  •   Michal Borowiecki    6 年前

    正如您已经发现的那样,问题是在您的模块中,Kafka版本(1.0)与Flink Connector预期的版本(0.9)不匹配。

    你可以这样做:

    mvn dependency:tree
    

    在命令行上找出kafka客户机依赖版本的来源。

    在模块的POM中,您可以添加DependencyManagement部分,将Kafka客户端库依赖项版本覆盖到您需要的版本,如下所示:

    <dependencyManagement>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>0.9.0.1</version>
        </dependency>
    </dependencyManagement>