我认为这可以通过使用window函数和如下聚合来解决:
df
// enumerate AppLaunches
.withColumn("AppLauchNr", sum(when($"EventType" === "AppLauch", 1)).over(Window.partitionBy($"UserId").orderBy($"Time".asc)))
// get first last game per AppLaunch
.withColumn("firstGamePlayed", first($"GameType", true).over(Window.partitionBy($"UserId", $"AppLauchNr").orderBy($"Time".asc)))
.withColumn("lastGamePlayed", first($"GameType", true).over(Window.partitionBy($"UserId", $"AppLauchNr").orderBy($"Time".desc)))
// now aggregate
.groupBy($"AppLauchNr")
.agg(
first($"UserId").as("UserId"),
min($"EventId").as("EventId"),
lit("AppLauch").as("EventType"), // this is always AppLauch
min($"Time").as("Time"),
first($"firstGamePlayed", true).as("firstGamePlayed"),
first($"lastGamePlayed", true).as("lastGamePlayed")
)
.drop($"AppLauchNr")
第一场和最后一场比赛也可以使用
orderBy().groupBy()
而不是窗口函数,但我仍然不确定spark是否会在聚合过程中保留排序(文档中没有提到这一点,请参见例如。
Spark DataFrame: does groupBy after orderBy maintain that order?
和中的讨论
https://issues.apache.org/jira/browse/SPARK-16207
)
df
.withColumn("AppLauchNr", sum(when($"EventType" === "AppLauch", 1)).over(Window.partitionBy($"UserId").orderBy($"Time".asc)))
.orderBy($"UserId",$"AppLauchNr",$"Time")
.groupBy($"UserId",$"AppLauchNr")
.agg(
first($"EventId").as("EventId"),
first($"EventType").as("EventType"),
first($"Time").as("Time"),
first($"GameType", true).as("firstGamePlayed"),
last($"GameType", true).as("lastGamePlayed")
)
.drop($"AppLauchNr")