代码之家  ›  专栏  ›  技术社区  ›  Averell

ApacheFlink-S3文件夹监视-丢失了许多文件

  •  0
  • Averell  · 技术社区  · 6 年前

    大家好,

    我有一个Flink作业,它有一个S3文件夹作为源,我们不断地将数千个小的(大约每个1Kb)gzip文件放入该文件夹,速度大约为每分钟5000个文件。 以下是我在scala中创建该源的方法:

        val my_input_format = new TextInputFormat(
            new org.apache.flink.core.fs.Path(my_path))
        my_input_format.setFilesFilter(FilePathFilter.createDefaultFilter())
        my_input_format.setNestedFileEnumeration(true)
    
        val my_raw_stream = streamEnv
                .readFile(my_input_format,
                    my_path,
                    FileProcessingMode.PROCESS_CONTINUOUSLY,
                    1000)
    

    问题是,如上所述,监测间隔为1000毫秒,约有20%的文件丢失。从ApacheFlink Dashboard,在随后的操作程序中,我只能看到大约80%的记录文件总数(“记录已发送”列)。

    如果我增加监视间隔,丢失的文件数量就会减少。在5000毫秒时,约为10%,而在30000毫秒时,只有约2%的数据丢失。

    但未记录任何警告/错误。

    我无法在HDFS中模拟这一点,因为在我们的集群中无法达到如此高的文件写入速度。

    有人能帮忙吗? 非常感谢。

    1 回复  |  直到 6 年前
        1
  •  1
  •   Averell    6 年前

    AmazonS3为列出目录提供了最终的一致性(请参见 this question )

    监控源列出目录中的文件,并通过记住文件的最大修改时间戳来跟踪它处理的文件。由于S3列表不能保证立即一致,因此最大修改时间戳可能会提前,时间戳较小的文件可能会丢失。

    我认为增加监测间隔不能完全解决这个问题。相反,我们需要一个额外的参数来添加最大时间戳的偏移量。如果你能通过 mailing list 或者打开一个 Jira ticket .

    =======更新===========

    我已经按照法比安的建议实施了变更。功能方面,它已完成并工作。需要花费更多的时间来编写适当的单元测试/文档。 My implementation is here