代码之家  ›  专栏  ›  技术社区  ›  Phil

无法从EMR上的Flink作业访问S3

  •  1
  • Phil  · 技术社区  · 6 年前

    我很难从一个轻率的工作中获得S3。

    如果我提交组装好的jar用于我的工作,我会得到一个拒绝访问错误:

    Caused by: 
    com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.services.s3.model.AmazonS3Exception:
    Access Denied (Service: Amazon S3; Status Code: 403; Error Code: AccessDenied; Request ID: ...; S3 Extended Request ID: ...), S3 Extended Request ID: ...
    

    这是我的设置: 的EMR群集是用“高级配置”创建的, 燧石1.4.0。和Hadoop 2.8.3。作为应用程序。 1X主节点,2X节点

    实例具有 EMR_EC2_DefaultRole 具有策略AmazonePlasticMapReduceforeC2角色,具有S3完全访问权限。 实际上,我可以在主节点和从节点上成功地发出这些命令:

    aws s3api list-buckets

    hdfs dfs -ls s3://bucketA

    我连接到主机并启动群集: /usr/lib/flink/bin/yarn-session.sh -n 2 -d

    Flink作业从桶中读取源代码:

    object TestS3 {
      def main(args: Array[String]): Unit = {
    
      val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
    
      val input: DataSet[String] = env.readTextFile("s3://bucketA/source/file")
      input.writeAsText("s3://bucketB/delete/me/later")
    
      env.execute()
      }
    }
    

    这是我的Simple Build.sbt:

    name := "TestS3"
    scalaVersion := "2.11.11"
    version := "0.1"
    val flinkVersion = "1.4.0"
    libraryDependencies ++= Seq(
        "org.apache.flink" % "flink-core" % flinkVersion % "provided",
        "org.apache.flink" %% "flink-scala" % flinkVersion % "provided"
    )
    

    该存储桶没有拒绝读取访问的策略。它有拒绝删除的策略,但这不会影响Flink作业。 这个 EMR_EC2_Default_Role 允许完全访问S3。

    像往常一样,任何我做错了什么的暗示都是非常感激的。或者我的期望是错误的?!

    这是完整的stacktrace:

    java.io.IOException: Error opening the Input Split s3://bucketA/source/file [0,-1]: com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.services.s3.model.AmazonS3Exception: Access Denied (Service: Amazon S3; Status Code: 403; Error Code: AccessDenied; Request ID: xyz_request_id; S3 Extended Request ID: xyz_ext_request_id), S3 Extended Request ID: xyz_ext_request_id
        at org.apache.flink.api.common.io.FileInputFormat.open(FileInputFormat.java:705)
        at org.apache.flink.api.common.io.DelimitedInputFormat.open(DelimitedInputFormat.java:477)
        at org.apache.flink.api.common.io.DelimitedInputFormat.open(DelimitedInputFormat.java:48)
        at org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:145)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
        at java.lang.Thread.run(Thread.java:748)
    Caused by: java.io.IOException: com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.services.s3.model.AmazonS3Exception: Access Denied (Service: Amazon S3; Status Code: 403; Error Code: AccessDenied; Request ID: xyz_request_id; S3 Extended Request ID: xyz_ext_request_id), S3 Extended Request ID: xyz_ext_request_id
        at com.amazon.ws.emr.hadoop.fs.s3n.Jets3tNativeFileSystemStore.handleAmazonServiceException(Jets3tNativeFileSystemStore.java:434)
        at com.amazon.ws.emr.hadoop.fs.s3n.Jets3tNativeFileSystemStore.retrievePair(Jets3tNativeFileSystemStore.java:461)
        at com.amazon.ws.emr.hadoop.fs.s3n.Jets3tNativeFileSystemStore.retrievePair(Jets3tNativeFileSystemStore.java:439)
        at com.amazon.ws.emr.hadoop.fs.s3n.S3NativeFileSystem.open(S3NativeFileSystem.java:1097)
        at org.apache.hadoop.fs.FileSystem.open(FileSystem.java:790)
        at com.amazon.ws.emr.hadoop.fs.EmrFileSystem.open(EmrFileSystem.java:166)
        at org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.open(HadoopFileSystem.java:119)
        at org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.open(HadoopFileSystem.java:36)
        at org.apache.flink.api.common.io.FileInputFormat$InputSplitOpenThread.run(FileInputFormat.java:865)
    Caused by: com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.services.s3.model.AmazonS3Exception: Access Denied (Service: Amazon S3; Status Code: 403; Error Code: AccessDenied; Request ID: xyz_request_id; S3 Extended Request ID: xyz_ext_request_id), S3 Extended Request ID: xyz_ext_request_id
        at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleErrorResponse(AmazonHttpClient.java:1639)
        at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1304)
        at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1056)
        at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:743)
        at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:717)
        at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:699)
        at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:667)
        at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:649)
        at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:513)
        at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4325)
        at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4272)
        at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.services.s3.AmazonS3Client.getObject(AmazonS3Client.java:1409)
        at com.amazon.ws.emr.hadoop.fs.s3.lite.call.GetObjectCall.perform(GetObjectCall.java:22)
        at com.amazon.ws.emr.hadoop.fs.s3.lite.call.GetObjectCall.perform(GetObjectCall.java:9)
        at com.amazon.ws.emr.hadoop.fs.s3.lite.executor.GlobalS3Executor.execute(GlobalS3Executor.java:91)
        at com.amazon.ws.emr.hadoop.fs.s3.lite.AmazonS3LiteClient.invoke(AmazonS3LiteClient.java:176)
        at com.amazon.ws.emr.hadoop.fs.s3.lite.AmazonS3LiteClient.getObject(AmazonS3LiteClient.java:99)
        at com.amazon.ws.emr.hadoop.fs.s3n.Jets3tNativeFileSystemStore.retrievePair(Jets3tNativeFileSystemStore.java:452)
        ... 7 more
    
    1 回复  |  直到 6 年前
        1
  •  0
  •   Phil    6 年前

    我们发现,源bucket是用KMS密钥加密的。因此,emr_EC2_DefaultRole无法完全访问S3,但它还需要访问KMS密钥。我们分别扩展了emr-ec2-defaultrole,flink作业现在可以访问该文件。

    也许这篇文章可以帮助人们节省一些时间(不要忘记KMS加密)。