代码之家  ›  专栏  ›  技术社区  ›  Raphael Roth

在Spark中创建给定架构的空数组列

  •  6
  • Raphael Roth  · 技术社区  · 6 年前

    由于实木地板不能分割空数组,所以在编写表之前,我用空数组替换了空数组。现在,当我看桌子的时候,我想做相反的事情:

    我有一个具有以下模式的数据帧:

    |-- id: long (nullable = false)
     |-- arr: array (nullable = true)
     |    |-- element: struct (containsNull = true)
     |    |    |-- x: double (nullable = true)
     |    |    |-- y: double (nullable = true)
    

    以及以下内容:

    +---+-----------+
    | id|        arr|
    +---+-----------+
    |  1|[[1.0,2.0]]|
    |  2|       null|
    +---+-----------+
    

    我想将空数组(i d=2)替换为空数组,即

    +---+-----------+
    | id|        arr|
    +---+-----------+
    |  1|[[1.0,2.0]]|
    |  2|         []|
    +---+-----------+
    

    我试过了:

    val arrSchema = df.schema(1).dataType
    
    df
    .withColumn("arr",when($"arr".isNull,array().cast(arrSchema)).otherwise($"arr"))
    .show()
    

    它给出:

    java.lang.ClassCastException:org.apache.spark.sql.types.nullType$ 不能强制转换为org.apache.spark.sql.types.structType

    编辑:我不想“硬编码”我的数组列的任何模式(至少不是结构的模式),因为这可能因情况而异。我只能使用来自 df 运行时

    顺便说一下,我用的是Spark 2.1,所以我不能用 typedLit

    4 回复  |  直到 6 年前
        1
  •  3
  •   Alper t. Turker    6 年前
    • 已知外部类型的火花2.2+。

      一般来说,你可以使用 typedLit 提供空数组。

      import org.apache.spark.sql.functions.typedLit
      
      typedLit(Seq.empty[(Double, Double)])
      

      要为嵌套对象使用特定的名称,可以使用case类:

      case class Item(x: Double, y: Double)
      
      typedLit(Seq.empty[Item])
      

      rename by cast 以下内容:

      typedLit(Seq.empty[(Double, Double)])
        .cast("array<struct<x: Double, y: Double>>")
      
    • Spark 2.1+仅限架构

      只有架构可以尝试:

      val schema = StructType(Seq(
        StructField("arr", StructType(Seq(
          StructField("x", DoubleType),
          StructField("y", DoubleType)
        )))
      ))
      
      def arrayOfSchema(schema: StructType) =
        from_json(lit("""{"arr": []}"""), schema)("arr")
      
      arrayOfSchema(schema).alias("arr")
      

      在哪里? schema 可以从现有的 DataFrame 再加上 StructType 以下内容:

      StructType(Seq(
        StructField("arr", df.schema("arr").dataType)
      ))
      
        2
  •  1
  •   Raphael Roth    6 年前

    一种方法是使用UDF:

    val arrSchema = df.schema(1).dataType // ArrayType(StructType(StructField(x,DoubleType,true), StructField(y,DoubleType,true)),true)
    
    val emptyArr = udf(() => Seq.empty[Any],arrSchema)
    
    df
    .withColumn("arr",when($"arr".isNull,emptyArr()).otherwise($"arr"))
    .show()
    
    +---+-----------+
    | id|        arr|
    +---+-----------+
    |  1|[[1.0,2.0]]|
    |  2|         []|
    +---+-----------+
    
        3
  •  1
  •   Leo C    6 年前

    另一种方法是 coalesce 以下内容:

    val df = Seq(
      (Some(1), Some(Array((1.0, 2.0)))),
      (Some(2), None)
    ).toDF("id", "arr")
    
    df.withColumn("arr", coalesce($"arr", typedLit(Array.empty[(Double, Double)]))).
      show
    // +---+-----------+
    // | id|        arr|
    // +---+-----------+
    // |  1|[[1.0,2.0]]|
    // |  2|         []|
    // +---+-----------+
    
        4
  •  0
  •   Zach    6 年前

    带case类的udf也可能很有趣:

    case class Item(x: Double, y: Double)
    val udf_emptyArr = udf(() => Seq[Item]())
    df
    .withColumn("arr",coalesce($"arr",udf_emptyArr()))
    .show()