代码之家  ›  专栏  ›  技术社区  ›  Srinivas

使用Spark Window函数获取最后一个值

  •  3
  • Srinivas  · 技术社区  · 6 年前

    假设我有这样的数据框架。

    val df = sc.parallelize(Seq(
                (1.0, 1,"Matt"), 
                (1.0, 2,"John"),
                (1.0, 3,null.asInstanceOf[String]),
                (-1.0, 2,"Adam"), 
                (-1.0, 4,"Steve"))
              ).toDF("id", "timestamp","name")
    

    我想为每个按时间戳排序的ID获取最后一个非空值。这是我的窗户

    val partitionWindow = Window.partitionBy($"id").orderBy($"timestamp".desc)
    

    我正在创建一个不同的窗口数据

    val filteredDF = df.filter($"name".isNotNull).withColumn("firstName", first("name") over (partitionWindow)).drop("timestamp","name").distinct
    

    把它和实际数据结合起来

    val joinedDF = df.join(filteredDF, windowDF.col("id") === filteredDF.col("id")).drop(filteredDF.col("id"))
    
    joinedDF.show()
    

    它很好用,但我不喜欢这个解决方案,有人能给我推荐更好的吗?

    还有,有人能告诉我为什么最后一个功能不能工作吗?我试过了结果不正确

     val partitionWindow = Window.partitionBy($"id").orderBy($"timestamp")
    
    val windowDF = df.withColumn("lastName", last("name") over (partitionWindow))
    
    1 回复  |  直到 6 年前
        1
  •  4
  •   Alper t. Turker    6 年前

    如果要传播最后一个已知值(它与使用的逻辑不同 join )你应该:

    • ORDER BY timestamp .
    • 采取 last 忽略 nulls :
    val partitionWindow = Window.partitionBy($"id").orderBy($"timestamp")
    
    df.withColumn("lastName", last("name", true) over (partitionWindow)).show
    // +----+---------+-----+--------+
    // |  id|timestamp| name|lastName|
    // +----+---------+-----+--------+
    // |-1.0|        2| Adam|    Adam|
    // |-1.0|        4|Steve|   Steve|
    // | 1.0|        1| Matt|    Matt|
    // | 1.0|        2| John|    John|
    // | 1.0|        3| null|    John|
    // +----+---------+-----+--------+
    

    如果要全局获取最后一个值:

    • 按时间戳排序 .
    • 设置无边界帧。
    • 采取 最后的 忽略 空值 :
    val partitionWindow = Window.partitionBy($"id").orderBy($"timestamp")
      .rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)
    
    df.withColumn("lastName", last("name", true) over (partitionWindow)).show
    // +----+---------+-----+--------+
    // |  id|timestamp| name|lastName|
    // +----+---------+-----+--------+
    // |-1.0|        2| Adam|   Steve|
    // |-1.0|        4|Steve|   Steve|
    // | 1.0|        1| Matt|    John|
    // | 1.0|        2| John|    John|
    // | 1.0|        3| null|    John|
    // +----+---------+-----+--------+