代码之家  ›  专栏  ›  技术社区  ›  Daniel

目录中的Apache Flink流文件

  •  0
  • Daniel  · 技术社区  · 6 年前

    我想按照给定的顺序将一组csv文件加载到Apache Flink中,例如,根据文件名中的命名方案确定,其中可能包含一些时间戳信息。

    在Apache Spark中,只要将文件移动到特定目录(例如/数据/暂存目录),我就可以将文件流式传输到数据集,原子文件移动如下

    Dataset<Row> fileStreamDf = spark.readStream()
                .option("header", true)
                .schema(schema)
                .csv("/data/staging")
    

    然后,我会按照给定的顺序(例如使用bash脚本)将文件逐个移动到该临时目录。

    如何使用Apache Flink实现同样的效果?

    1 回复  |  直到 6 年前
        1
  •  1
  •   kkrugler    6 年前

    这并不完全是相同的用例,但我们必须在流媒体作业中执行类似的操作(文件是 HDF5 CSV )所以我写了一个 RichSourceFunction 它知道如何按正确的顺序遍历文件,并将文件路径(在S3中)作为字符串记录发出。然后是下游 FlatMapFunction 解析文件并发出实际行。