代码之家  ›  专栏  ›  技术社区  ›  helpneeded

Spark GroupBy,同时使用null维护架构

  •  0
  • helpneeded  · 技术社区  · 7 年前

    我有一个包含多个JSON对象的文件,其模式如下:

    {A: struct, B: struct, C: struct, D: struct}
    

    其值为 A 从不为空;然而,只有一个 B, C, D 也可以为非null。例如,我们会在数据框中看到类似的内容:

    +----+----+----+----+
    | A  | B  | C  | D  |
    +----+----+----+----+
    |[..]|[..]|null|null|
    |[..]|null|[..]|null|
    |[..]|null|null|[..]|
    +----+----+----+----+
    

    我正在尝试将数据帧按 A. 同时保持相同的架构/列结构 (A,B,C,D) 这样,给定行中的所有值都是非null的。

    可能存在多对一关系 A. 以及任何 B,C,D ,在这种情况下,我会将模式更改为

    {A: struct, B: list, C: list, D: list} ,但仍保留列名。

    我对Spark和Scala还比较陌生,只能以一种程序化的方式来构建我的想法,在这种方式中,我遍历每一行并进行散列 A. ,并以这种方式构建一个完整的行,但我不相信这是一个干净的解决方案,也不能使用spark API有效地表达它。

    2 回复  |  直到 7 年前
        1
  •  1
  •   Dominic Egger    7 年前

    注释部分有点笨拙,下面是一个示例

    scala> case class Foo(a:String, b:String, c:String)
    defined class Foo
    
    scala> val ds = spark.createDataset(List(Foo("1","1",null), Foo("1",null,null), Foo("1","3",null), Foo("1", null, null)))
    ds: org.apache.spark.sql.Dataset[Foo] = [a: string, b: string ... 1 more field]
    
    scala> val collected = ds.groupBy(ds("a")).agg(collect_list(ds("b")).alias("b"), collect_list(ds("c")).alias("c"))
    collected: org.apache.spark.sql.DataFrame = [a: string, b: array<string> ... 1 more field]
    
    scala> val filtered = collected.where(size(collected("b")) > 0 and size(collected("c")) > 0)
    filtered: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [a: string, b: array<string> ... 1 more field]
    
    scala> collected.show
    +---+------+---+
    |  a|     b|  c|
    +---+------+---+
    |  1|[1, 3]| []|
    +---+------+---+
    
    
    scala> filtered.show
    +---+---+---+
    |  a|  b|  c|
    +---+---+---+
    +---+---+---+
    
        2
  •  1
  •   Oli    5 年前
    val df = spark.createDataFrame(
         sc.parallelize(
             Seq(Row(1, 2, 3, 4), Row(1, 3, 4, null),
                 Row(2, null, 4, null), Row(2, 2, 2, null))),
             StructType(Seq("A","B","C","D")
                            .map(StructField(_, IntegerType, true))
         )
    )
    
    df.show()
    +---+----+---+----+
    |  A|   B|  C|   D|
    +---+----+---+----+
    |  1|   2|  3|   4|
    |  1|   3|  4|null|
    |  2|null|  4|null|
    |  2|   2|  2|null|
    +---+----+---+----+
    
    df
        .groupBy("A")
        .agg(collect_list('B) as "B", 
             collect_list('C) as "C",
             collect_list('D) as "D")
        .show
    
    +---+------+------+---+
    |  A|     B|     C|  D|
    +---+------+------+---+
    |  1|[2, 3]|[3, 4]|[4]|
    |  2|   [2]|[4, 2]| []|
    +---+------+------+---+
    

    默认情况下,collect\u list不收集null值,这正是您想要的(如果所有值都为null,则会得到一个空列表)。使用collect\u set避免重复。