代码之家  ›  专栏  ›  技术社区  ›  riyaz-ali

如何在Flink中为Google云存储创建RecoverableWriter

  •  1
  • riyaz-ali  · 技术社区  · 6 年前

    Google Cloud Storage 写作( 下沉 )要素 DataStream 从我的流媒体工作使用 StreamingFileSink .

    为此,我用了 Google Cloud Storage connector 将Hadoop作为 org.apache.hadoop.fs.FileSystem ,并已使用 HadoopFileSystem as an implementation of org.apache.flink.core.fs.FileSystem hadoop文件系统 为弗林克上课。

    我在gradle文件中包含了以下依赖项:

    • compile( "com.google.cloud.bigdataoss:gcs-connector:1.9.4-hadoop2" )
    • "org.apache.flink:flink-connector-filesystem_2.11:1.6.0" )
    • provided( "org.apache.flink:flink-shaded-hadoop2:1.6.0" )

    从我的理解来看 [1] [2] [3] ,Flink动态加载 FileSystemFactory 运行时(通过 java.util.ServiceLoader HadoopFsFactory 运行时(通过 ,如果它在类路径中找到Hadoop),则使用它来创建 FileSystem .

    RecoverableWriter for Hadoop兼容包只支持 hdfs 文件方案(我使用 gs )因此 throws an error at runtime .

    所以,我 extended Hadoop文件系统 GCSFileSystem )以及 @overrided 这个 FileSystem#createRecoverableWriter() RecoverableWriter 然后处理恢复等细节,并创建相应的 文件系统工厂 @AutoService 因此应该被 ServiceLoader ).

    该设置在本地和本地docker集群上运行良好(实际上,GCS连接器由于缺乏授权而抛出错误,但这很好,因为这意味着 文件系统 但当我将它部署到运行在Google计算引擎上的docker集群时,它失败了。

    在GCE上,默认 Hadoop文件系统 加载并在执行方案时引发异常 而不是 高密度光纤

    我在弗林克 作为一个 long running session cluster on Docker using docker-flink

    1 回复  |  直到 6 年前
        1
  •  1
  •   riyaz-ali    6 年前

    答案在 行动计划的一部分!!

    我在一辆卡车上跑 长寿命会话群集 job.jar 被执行死刑 FileSystem 初始化 我增加工作时有人打电话。

    • 独立的: 执行 lib/

    • 群集( manual ): 文件系统 执行 您的目录 zip

    • docker long-living ): 创建一个自定义容器映像并将jar添加到 该图像的目录。

    • 群集( 码头工人 )( per-job-session ): 文件系统 和你的工作等)的 库/ 目录, read more about per-job session here.