代码之家  ›  专栏  ›  技术社区  ›  Marsellus Wallace

如何重写Spark数据帧scala中的嵌套列?

  •  0
  • Marsellus Wallace  · 技术社区  · 5 年前

    我有一个数据框架,它有多个列,其中一些列是结构。像这样的东西

    root
     |-- foo: struct (nullable = true)
     |    |-- bar: string (nullable = true)
     |    |-- baz: string (nullable = true)
     |-- abc: array (nullable = true)
     |    |-- element: struct (containsNull = true)
     |    |    |-- def: struct (nullable = true)
     |    |    |    |-- a: string (nullable = true)
     |    |    |    |-- b: integer (nullable = true)
     |    |    |    |-- c: string (nullable = true)
    

    我想申请 UserDefinedFunction 在柱子上 baz 替换 巴兹 具有以下功能 巴兹 但是我不知道怎么做。以下是所需输出的示例(请注意 巴兹 现在是 int )

    root
     |-- foo: struct (nullable = true)
     |    |-- bar: string (nullable = true)
     |    |-- baz: int (nullable = true)
     |-- abc: array (nullable = true)
     |    |-- element: struct (containsNull = true)
     |    |    |-- def: struct (nullable = true)
     |    |    |    |-- a: string (nullable = true)
     |    |    |    |-- b: integer (nullable = true)
     |    |    |    |-- c: string (nullable = true)
    

    看起来像 DataFrame.withColumn 仅适用于顶层列,而不适用于嵌套列。我用scala解决这个问题。

    有人能帮我吗?

    谢谢

    0 回复  |  直到 6 年前
        1
  •  15
  •   Rodrigue    6 年前

    这很简单,只需使用一个点来选择嵌套结构,例如 $"foo.baz" :

    case class Foo(bar:String,baz:String)
    case class Record(foo:Foo)
    
    val df = Seq(
       Record(Foo("Hi","There"))
    ).toDF()
    
    
    df.printSchema
    
    root
     |-- foo: struct (nullable = true)
     |    |-- bar: string (nullable = true)
     |    |-- baz: string (nullable = true)
    
    
    val myUDF = udf((s:String) => {
     // do something with s 
      s.toUpperCase
    })
    
    
    df
    .withColumn("udfResult",myUDF($"foo.baz"))
    .show
    
    +----------+---------+
    |       foo|udfResult|
    +----------+---------+
    |[Hi,There]|    THERE|
    +----------+---------+
    

    如果要将UDF的结果添加到现有结构中 foo ,即获得:

    root
     |-- foo: struct (nullable = false)
     |    |-- bar: string (nullable = true)
     |    |-- baz: string (nullable = true)
     |    |-- udfResult: string (nullable = true)
    

    有两种选择:

    具有 withColumn :

    df
    .withColumn("udfResult",myUDF($"foo.baz"))
    .withColumn("foo",struct($"foo.*",$"udfResult"))
    .drop($"udfResult")
    

    具有 select :

    df
    .select(struct($"foo.*",myUDF($"foo.baz").as("udfResult")).as("foo"))
    

    编辑: 用UDF的结果替换结构中的现有属性: 不幸的是,确实如此 工作:

    df
    .withColumn("foo.baz",myUDF($"foo.baz")) 
    

    但可以这样做:

    // get all columns except foo.baz
    val structCols = df.select($"foo.*")
        .columns
        .filter(_!="baz")
        .map(name => col("foo."+name))
    
    df.withColumn(
        "foo",
        struct((structCols:+myUDF($"foo.baz").as("baz")):_*)
    )