我正在为Apache Flink实现一种自定义输入格式。我创建了一个虚拟输入格式,它返回3行。
public class ElasticsearchInputFormat extends GenericInputFormat<Row> {
@Override
public void configure(Configuration parameters) {
System.out.println("configuring");
}
@Override
public BaseStatistics getStatistics(BaseStatistics cachedStatistics) throws IOException {
return cachedStatistics;
}
@Override
public void open(GenericInputSplit split) throws IOException {
System.out.println("opening: " + split);
super.open(split);
}
@Override
public void close() throws IOException {
System.out.println("closing");
super.close();
}
private int a = 0;
public boolean reachedEnd() throws IOException {
a++;
return a > 3;
}
public Row nextRecord(Row reuse) throws IOException {
Row r = new Row(2);
r.setField(0, "osman");
r.setField(1, "wow");
return r;
}
}
我的示例代码如下:
final ExecutionEnvironment env = ExecutionEnvironment.createCollectionsEnvironment();
env.setParallelism(8);
DataSource<Row> input = env.createInput(new ElasticsearchInputFormat());
input.print();
但是,尽管并行度设置为8,但它会打印:
configuring
opening: GenericSplit (0/1)
closing
osman,wow
osman,wow
osman,wow
为什么它没有并行化?我希望有多个拆分,以便其他操作符可以并行使用它。