代码之家  ›  专栏  ›  技术社区  ›  Mustafa

Apache Flink:自定义InputFormat仅以1的并行度运行

  •  1
  • Mustafa  · 技术社区  · 6 年前

    我正在为Apache Flink实现一种自定义输入格式。我创建了一个虚拟输入格式,它返回3行。

    public class ElasticsearchInputFormat extends GenericInputFormat<Row> {
        @Override
        public void configure(Configuration parameters) {
            System.out.println("configuring");
        }
    
        @Override
        public BaseStatistics getStatistics(BaseStatistics cachedStatistics) throws IOException {
            return cachedStatistics;
        }
    
        @Override
        public void open(GenericInputSplit split) throws IOException {
            System.out.println("opening: " + split);
            super.open(split);
        }
    
        @Override
        public void close() throws IOException {
            System.out.println("closing");
            super.close();
        }
    
        private int a = 0;
    
        public boolean reachedEnd() throws IOException {
            a++;
            return a > 3;
        }
    
        public Row nextRecord(Row reuse) throws IOException {
            Row r = new Row(2);
            r.setField(0, "osman");
            r.setField(1, "wow");
            return r;
        }
    }
    

    我的示例代码如下:

    final ExecutionEnvironment env = ExecutionEnvironment.createCollectionsEnvironment();
    env.setParallelism(8);
    
    DataSource<Row> input = env.createInput(new ElasticsearchInputFormat());
    
    input.print();
    

    但是,尽管并行度设置为8,但它会打印:

    configuring
    opening: GenericSplit (0/1)
    closing
    osman,wow
    osman,wow
    osman,wow
    

    为什么它没有并行化?我希望有多个拆分,以便其他操作符可以并行使用它。

    1 回复  |  直到 6 年前
        1
  •  2
  •   kkrugler    6 年前

    createCollectionsEnvironment() 返回隐式并行度为1的特殊环境。从Javadocs。。。

    创建使用Java集合的{@link CollectionEnvironment} 在下面这将在当前 JVM。速度非常快,但如果数据不符合 记忆力平行度将始终为1。这在以下情况下很有用 实施和调试。