代码之家  ›  专栏  ›  技术社区  ›  Abir Chokraborty

基于两列值对Spark数据集排序的有效方法是什么?

  •  1
  • Abir Chokraborty  · 技术社区  · 6 年前

    我有一个三列的大型数据集,格式如下:

    col1   col2   col3
    ------------------
     a1     1      i1
     a1     1      i2
     a1     2      i3
     a3     2      i4
     a3     1      i5
     a2     3      i6
     a2     3      i7
     a2     1      i8
    

    我写了以下内容:

       val datase2 = dataset.groupBy("col1","col2").agg(collect_list("col3").as("col3"))
            .sort("col1", "col2")
            .groupBy("col1").agg(collect_list("col2"), collect_list("col3"))
            .toDF("col1", "col2", "col3").as[(String, Array[String], Array[String])]
    

    获取 col2 根据结果数据集,我编写了以下内容:

    dataset2.select("col3").distinct().show()
    

    上述代码适用于小数据集,但对于大数据集,我得到了以下类型的结果(只是为了说明不一致的结果数据集的场景):

    col1     col2           col3
    -----------------------------------
    a1     [1, 2]      [[i1, i2], [i3]]
    a2     [3, 1]      [[i6, i7], [i8]]
    a3     [2, 1]      [[i4], [i5]]
    

    和我一样 sort("col1", "col2") 输出应为

    col1     col2           col3
    -----------------------------------
    a1     [1, 2]      [[i1, i2], [i3]]
    a2     [1, 3]      [[i8], [i6, i7]]
    a3     [1, 2]      [[i5], [i4]]
    

    col2 将按排序,并且 col2 col3 将根据其数组索引保持一致。例如,上述数据集的最后一行是

     col2           col3
    -------------------------
    [1, 2]      [[i5], [i4]]
    

    但不是

     col2           col3
    -------------------------
    [1, 2]      [[i4], [i5]]
    

    我怎样才能实现我的目标?

    1 回复  |  直到 6 年前
        1
  •  2
  •   Alper t. Turker    6 年前

    使用合并记录 struct 和使用 sort_array :

    dataset
      .groupBy($"col1")
      .agg(sort_array(collect_list(struct($"col2", $"col3"))).alias("data"))
      .select($"col1", $"data.col2", $"data.col3")
    

    学分转到 user6910411) 对于 this answer