代码之家  ›  专栏  ›  技术社区  ›  markwatson

AWS Glue:如何在输出中添加具有源文件名的列?

  •  7
  • markwatson  · 技术社区  · 6 年前

    有人知道如何将源文件名作为列添加到粘合作业中吗?

    我们创建了一个流,在其中我们对S3中的一些文件进行了爬网,以创建一个模式。然后,我们编写了一个将文件转换为新格式的作业,并将这些文件作为CSV写回另一个S3存储桶,供管道的其余部分使用。我们想要做的是访问某种类型的作业元属性,以便我们可以向包含原始文件名的输出文件中添加一个新列。

    我查看了AWS文档和AWS glue-libs源代码,但没有看到任何跳出的内容。理想情况下,可以通过某种方式从 awsglue.job 包(我们使用的是python风格)。

    我还在学胶水,所以如果我使用了错误的术语,我深表歉意。我还用spark标签给它贴上了标签,因为我相信这就是胶水在封面下使用的东西。

    3 回复  |  直到 6 年前
        1
  •  4
  •   Sandeep Fatangare    6 年前

    您可以在etl工作中使用spark:

    var df = glueContext.getCatalogSource(
      database = database,
      tableName = table,
      transformationContext = s"source-$database.$table"
    ).getDynamicFrame()
     .toDF()
     .withColumn("input_file_name", input_file_name())
    
    glueContext.getSinkWithFormat(
      connectionType = "s3",
      options = JsonOptions(Map(
        "path" -> args("DST_S3_PATH")
      )),
      transformationContext = "",
      format = "parquet"
    ).writeDynamicFrame(DynamicFrame(df, glueContext))
    

    请记住,它仅适用于getCatalogSource()API,而不适用于create\u dynamic\u frame\u from\u options()

        2
  •  4
  •   JcMaco    6 年前

    使用AWS Glue Python自动生成的脚本,我添加了以下几行:

    from pyspark.sql.functions import input_file_name
    
    ## Add the input file name column
    datasource1 = datasource0.toDF().withColumn("input_file_name", input_file_name())
    
    ## Convert DataFrame back to DynamicFrame
    datasource2 = datasource0.fromDF(datasource1, glueContext, "datasource2")
    

    然后,在 ApplyMapping datasink 您引用的部分代码 datasource2

        3
  •  1
  •   Amiri    4 年前

    我使用的是AWS Glue Python自动生成的脚本。我尝试使用 JcMaco 因为这正是我所需要的,这是一个非常简单的解决方案 input_file_name()

    然而,我无法做到这一点,我的专栏除了标题之外总是空的,但我 能够获取粘合作业的名称,并将其用作新列中的常量,其用途与 input\u file\u name() 在我的这个特定用例中。

    如果查看脚本的左上角,您将看到 args 参数 要访问JOB\u NAME,如下所示。

    我是如何做到的:

    from pyspark.sql.functions import *
    
    job_name = args['JOB_NAME'] # define new variable
    

    (JOB\u名称作为命令行参数传入。)

    然后,在 datasource0 在脚本中定义,使用 job_name 以及 lit 功能:

    applymapping1 = ApplyMapping.apply(frame = datasource0, mappings = […] , transformation_ctx = "applymapping1") 
    applymapping2 = applymapping1.toDF().withColumn("job_name", lit(job_name))
    applymapping3 = applymapping1.fromDF(applymapping2, glueContext, "applymapping3")
    

    在上面的示例中,您将更改 frame 中的参数 datasink 定义为 applymapping3