代码之家  ›  专栏  ›  技术社区  ›  Kalpesh

如何在spark dataframe、scala中将行转换为列

  •  4
  • Kalpesh  · 技术社区  · 7 年前

    是否有任何方法可以将数据帧行转换为列。 我有以下结构作为输入:

    val inputDF = Seq(("pid1","enc1", "bat"),
                      ("pid1","enc2", ""),
                      ("pid1","enc3", ""),
                      ("pid3","enc1", "cat"),
                      ("pid3","enc2", "")
                  ).toDF("MemberID", "EncounterID", "entry" )
    
    inputDF.show:
    
    +--------+-----------+-----+
    |MemberID|EncounterID|entry|
    +--------+-----------+-----+
    |    pid1|       enc1|  bat|
    |    pid1|       enc2|     |
    |    pid1|       enc3|     |
    |    pid3|       enc1|  cat|
    |    pid3|       enc2|     |
    +--------+-----------+-----+
    
    expected result:
    
    +--------+----------+----------+----------+-----+
    |MemberID|Encounter1|Encounter2|Encounter3|entry|
    +--------+----------+----------+----------+-----+
    |    pid1|      enc1|      enc2|      enc3|  bat|
    |    pid3|      enc1|      enc2|      null|  cat|
    +--------+----------+----------+----------+-----+
    

    请建议是否有任何优化的直接API可用于将行转换为列。 我的输入数据量非常大,所以像收集这样的操作,我将无法执行,因为它将获取驱动程序上的所有数据。 我正在使用Spark 2。x个

    1 回复  |  直到 5 年前
        1
  •  6
  •   Oli    7 年前

    我不确定你需要的是你真正要求的。然而,为了以防万一,这里有一个想法:

    val entries = inputDF.where('entry isNotNull)
        .where('entry !== "")
        .select("MemberID", "entry").distinct
    
    val df = inputDF.groupBy("MemberID")
        .agg(collect_list("EncounterID") as "encounterList")
        .join(entries, Seq("MemberID"))
    df.show
    +--------+-------------------------+-----+
    |MemberID|           encounterList |entry|
    +--------+-------------------------+-----+
    |    pid1|       [enc2, enc1, enc3]|  bat|
    |    pid3|             [enc2, enc1]|  cat|
    +--------+-------------------------+-----+
    

    列表的顺序不是确定的,但您可以对其进行排序,然后使用 .withColumn("Encounter1", sort_array($"encounterList")(0)) ...

    其他想法

    如果要将entry的值放入相应的“Conference”列中,可以使用透视:

    inputDF
        .groupBy("MemberID")
        .pivot("EncounterID", Seq("enc1", "enc2", "enc3"))
        .agg(first("entry")).show
    
    +--------+----+----+----+
    |MemberID|enc1|enc2|enc3|
    +--------+----+----+----+
    |    pid1| bat|    |    |
    |    pid3| cat|    |    |
    +--------+----+----+----+
    

    正在添加 Seq("enc1", "enc2", "enc3") 是可选的,但由于您知道列的内容,因此会加快计算速度。