代码之家  ›  专栏  ›  技术社区  ›  Lage Ragnarsson

Flink Kafka源时间戳提取器的类加载

  •  2
  • Lage Ragnarsson  · 技术社区  · 6 年前

    我正在尝试将Flink作业部署到基于 flink:1.4.1-hadoop27-scala\u 2.11-alpine 形象作业使用的是卡夫卡连接器源(flink-connector-Kafka-0.11),我试图为其分配时间戳和水印。我的代码与中的Scala示例非常相似 Flink Kafka connector documentation 。但FlinkKafkaConsumer011

    val myConsumer = new FlinkKafkaConsumer08[String]("topic", new SimpleStringSchema(), properties)
    myConsumer.assignTimestampsAndWatermarks(new CustomWatermarkEmitter())
    

    这在从IDE本地运行时非常有效。但是,在集群环境中,我遇到以下错误:

    java.lang.ClassNotFoundException: com.my.organization.CustomWatermarkEmitter
    at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
    at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:335)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
    at java.lang.Class.forName0(Native Method)
    at java.lang.Class.forName(Class.java:348)
    at org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.resolveClass(InstantiationUtil.java:73)
    at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1863)
    at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1746)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2037)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1568)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:428)
    at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:393)
    at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:380)
    at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:368)
    at org.apache.flink.util.SerializedValue.deserializeValue(SerializedValue.java:58)
    at org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.createPartitionStateHolders(AbstractFetcher.java:521)
    at org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.<init>(AbstractFetcher.java:167)
    at org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.<init>(Kafka09Fetcher.java:89)
    at org.apache.flink.streaming.connectors.kafka.internal.Kafka010Fetcher.<init>(Kafka010Fetcher.java:62)
    at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010.createFetcher(FlinkKafkaConsumer010.java:203)
    at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:564)
    at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:86)
    at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:55)
    at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:94)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:264)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
    at java.lang.Thread.run(Thread.java:748)
    

    我正在将我的工作构建为一个胖罐子,我已经验证它包含这个类。 文档中的这个示例是否仅在CustomWatermarkEmitter类位于/opt/flink/lib/文件夹中时有效?

    这就是我解决问题的方法。但是必须单独构建这个类并将其放置在/opt/flink/lib中,这会使我的构建过程变得非常复杂,所以我想知道这是应该解决的方法还是有其他方法可以解决这个问题?

    例如 Flink documentation 提示必须手动提供一些源UserCodeClassLoader?包括提供的卡夫卡来源?

    据我在org中所见,它似乎在内部使用了“userCodeClassLoader”。阿帕奇。弗林克。流式处理。连接器。卡夫卡。内部构件。AbstractFetcher:

                case PERIODIC_WATERMARKS: {
                for (Map.Entry<KafkaTopicPartition, Long> partitionEntry : partitionsToInitialOffsets.entrySet()) {
                    KPH kafkaHandle = createKafkaPartitionHandle(partitionEntry.getKey());
    
                    AssignerWithPeriodicWatermarks<T> assignerInstance =
                            watermarksPeriodic.deserializeValue(userCodeClassLoader);
    
                    KafkaTopicPartitionStateWithPeriodicWatermarks<T, KPH> partitionState =
                            new KafkaTopicPartitionStateWithPeriodicWatermarks<>(
                                    partitionEntry.getKey(),
                                    kafkaHandle,
                                    assignerInstance);
    
                    partitionState.setOffset(partitionEntry.getValue());
    
                    partitionStates.add(partitionState);
                }
    

    编辑:

    我创建了一个简单的项目,可以在此处重新发布此问题: https://github.com/lragnarsson/flink-kafka-classpath-problem

    为了复制,您需要docker和docker compose。

    只需执行以下操作:

    1. git克隆 https://github.com/lragnarsson/flink-kafka-classpath-problem.git
    2. cd flink kafka类路径问题/docker
    3. docker撰写构建
    4. docker合成
    5. 在浏览器中转到localhost:8081
    6. 提交target/scala-2.11/flink-kafka-classpath-problem-assembly-0.1-SNAPSHOT中包含的jar文件。罐子

    这将导致例外情况 Java语言lang.ClassNotFoundException:se。拉格纳松。拉格。MyTimestampExtractor

    1 回复  |  直到 6 年前
        1
  •  1
  •   Dawid Wysakowicz    6 年前

    我想您偶然发现了Flink 1.4.1中引入的一个bug: https://issues.apache.org/jira/browse/FLINK-8741

    它将在1.4.2中很快修复。您可以尝试在1.4.2中进行测试。rc2: https://github.com/apache/flink/tree/release-1.4.2-rc2