代码之家  ›  专栏  ›  技术社区  ›  Soheil Pourbafrani

Apache Flink-org的类文件。阿帕奇。弗林克。流式处理。api。斯卡拉。找不到数据流

  •  4
  • Soheil Pourbafrani  · 技术社区  · 6 年前

    使用Apache Flink版本1.3.2和Cassandra 3.11,我编写了一个简单的代码,使用Apache Flink Cassandra连接器将数据写入Cassandra。代码如下:

    final Collection<String> collection = new ArrayList<>(50);
            for (int i = 1; i <= 50; ++i) {
                collection.add("element " + i);
            }
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    
            DataStream<Tuple2<UUID, String>> dataStream = env
                    .fromCollection(collection)
                    .map(new MapFunction<String, Tuple2<UUID, String>>() {
    
                        final String mapped = " mapped ";
                        String[] splitted;
    
                        @Override
                        public Tuple2<UUID, String> map(String s) throws Exception {
                            splitted = s.split("\\s+");
                            return new Tuple2(
                                    UUID.randomUUID(),
                                    splitted[0] + mapped + splitted[1]
                            );
                        }
                    });
            dataStream.print();
            CassandraSink.addSink(dataStream)
                    .setQuery("INSERT INTO test.phases (id, text) values (?, ?);")
                    .setHost("127.0.0.1")
                    .build();
            env.execute();
    

    尝试使用Apache Flink 1.4.2(1.4.x)运行相同的代码时,出现错误:

    Error:(36, 22) java: cannot access org.apache.flink.streaming.api.scala.DataStream
      class file for org.apache.flink.streaming.api.scala.DataStream not found
    

    在线上

    CassandraSink.addSink(dataStream)
                        .setQuery("INSERT INTO test.phases (id, text) values (?, ?);")
                        .setHost("127.0.0.1")
                        .build();
    

    我认为Apache Flink 1.4.2中存在一些依赖性更改,这导致了问题。

    我使用代码中导入的以下依赖项:

    import org.apache.flink.api.common.functions.MapFunction;
    import org.apache.flink.api.java.tuple.Tuple2;
    import org.apache.flink.streaming.api.datastream.DataStream;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.streaming.connectors.cassandra.CassandraSink;
    

    如何解决Apache Flink版本1.4.2中的错误?

    更新时间: 在Flink 1.3.2中 org.apache.flink.streaming.api.scala.DataStream<T> 在Java文档中,但在版本1.4.2中没有这样的类。看见 here

    我在Cassandra connector的Flink 1.4.2文档中尝试了代码示例,但我得到了相同的错误,但该示例适用于Flink 1.3.2依赖项!

    1 回复  |  直到 6 年前
        1
  •  10
  •   Alex    6 年前

    除所有其他依赖项外,请确保您具有Flink Scala依赖项:

    专家

    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-streaming-scala_2.11</artifactId>
      <version>1.4.2</version>
    </dependency>
    

    格拉德尔

    dependencies {
        compile group: 'org.apache.flink', name: 'flink-streaming-scala_2.11', version: '1.4.2'
    ..
    }
    

    我设法让您的示例使用以下依赖项:

    import org.apache.flink.api.common.functions.MapFunction;
    import org.apache.flink.api.java.tuple.Tuple2;
    import org.apache.flink.streaming.api.datastream.DataStream;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.streaming.connectors.cassandra.CassandraSink;
    

    专家

    <dependencies>
    
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>1.4.2</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_2.11</artifactId>
            <version>1.4.2</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-scala_2.11</artifactId>
            <version>1.4.2</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_2.11</artifactId>
            <version>1.4.2</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-cassandra_2.11</artifactId>
            <version>1.4.2</version>
        </dependency>
    
    </dependencies>