假设我有这样的数据框架。
val df = sc.parallelize(Seq(
(1.0, 1,"Matt"),
(1.0, 2,"John"),
(1.0, 3,null.asInstanceOf[String]),
(-1.0, 2,"Adam"),
(-1.0, 4,"Steve"))
).toDF("id", "timestamp","name")
我想为每个按时间戳排序的ID获取最后一个非空值。这是我的窗户
val partitionWindow = Window.partitionBy($"id").orderBy($"timestamp".desc)
我正在创建一个不同的窗口数据
val filteredDF = df.filter($"name".isNotNull).withColumn("firstName", first("name") over (partitionWindow)).drop("timestamp","name").distinct
把它和实际数据结合起来
val joinedDF = df.join(filteredDF, windowDF.col("id") === filteredDF.col("id")).drop(filteredDF.col("id"))
joinedDF.show()
它很好用,但我不喜欢这个解决方案,有人能给我推荐更好的吗?
还有,有人能告诉我为什么最后一个功能不能工作吗?我试过了结果不正确
val partitionWindow = Window.partitionBy($"id").orderBy($"timestamp")
val windowDF = df.withColumn("lastName", last("name") over (partitionWindow))