代码之家  ›  专栏  ›  技术社区  ›  Alex

如何循环通过数据集创建摘要数据集

  •  1
  • Alex  · 技术社区  · 7 年前

    我刚刚开始学习和使用Spark,目前面临一个问题。任何建议或暗示都将不胜感激。

    基本上,我有一个包含不同用户所有类型事件的数据集,如AppLaunch、GameStart、GameEnd等,我想创建每个用户每次启动应用程序时的动作摘要。

    例如:我有以下数据集:
    UserId | Event Type | Time | GameType | Event Id|
    11111 | AppLauch | 11:01:53| null | 101 |
    11111 | GameStart | 11:01:59| Puzzle | 102 |
    11111 | GameEnd | 11:05:31| Puzzle | 103 |
    11111 | GameStart | 11:05:58| Word | 104 |
    11111 | GameEnd | 11:09:13| Word | 105 |
    11111 | AppEnd | 11:09:24| null | 106 |
    11111 | AppLauch | 12:03:43| null | 107 |
    22222 | AppLauch | 12:03:52| null | 108 |
    22222 | GameStart | 12:03:59| Puzzle | 109 |
    11111 | GameStart | 12:04:01| Puzzle | 110 |
    22222 | GameEnd | 12:06:11| Puzzle | 111 |
    11111 | GameEnd | 12:06:13| Puzzle | 112 |
    11111 | AppEnd | 12:06:23| null | 113 |
    22222 | AppEnd | 12:06:33| null | 114 |

    我想要的是类似于以下内容的数据集:
    EventId | USerId| Event Type | Time | FirstGamePlayed| LastGamePlayed|
    101 |11111 | AppLauch | 11:01:53| Puzzle | Word |
    107 |11111 | AppLauch | 12:03:43| Puzzle | Puzzle |
    108 |22222 | AppLauch | 12:03:52| Puzzle | Puzzle |

    只需要知道玩的第一个游戏和最后一个游戏,即使一个应用程序发布中玩的游戏超过3个。

    我最初的想法是按照用户Id和时间窗口(AppLaunch to AppEnd)对它们进行分组,然后找到一种方法来扫描数据集,如果有一个gameStart事件并且它落在任何窗口中,它将是第一个玩过的游戏,在AppEnd之前的最后一个gameStart事件将是最后一个玩过的游戏。但我没有找到实现这一目标的方法。

    任何提示/建议都很好。

    谢谢

    1 回复  |  直到 7 年前
        1
  •  1
  •   Raphael Roth    7 年前

    我认为这可以通过使用window函数和如下聚合来解决:

    df
       // enumerate AppLaunches 
       .withColumn("AppLauchNr", sum(when($"EventType" === "AppLauch", 1)).over(Window.partitionBy($"UserId").orderBy($"Time".asc)))
       // get first last game per AppLaunch
       .withColumn("firstGamePlayed", first($"GameType", true).over(Window.partitionBy($"UserId", $"AppLauchNr").orderBy($"Time".asc)))
       .withColumn("lastGamePlayed", first($"GameType", true).over(Window.partitionBy($"UserId", $"AppLauchNr").orderBy($"Time".desc)))
        // now aggregate
       .groupBy($"AppLauchNr")
       .agg(
            first($"UserId").as("UserId"),
            min($"EventId").as("EventId"),
            lit("AppLauch").as("EventType"), // this is always AppLauch
            min($"Time").as("Time"),
            first($"firstGamePlayed", true).as("firstGamePlayed"),
            first($"lastGamePlayed", true).as("lastGamePlayed")
       )
      .drop($"AppLauchNr")
    

    第一场和最后一场比赛也可以使用 orderBy().groupBy() 而不是窗口函数,但我仍然不确定spark是否会在聚合过程中保留排序(文档中没有提到这一点,请参见例如。 Spark DataFrame: does groupBy after orderBy maintain that order? 和中的讨论 https://issues.apache.org/jira/browse/SPARK-16207 )

     df
       .withColumn("AppLauchNr", sum(when($"EventType" === "AppLauch", 1)).over(Window.partitionBy($"UserId").orderBy($"Time".asc)))
       .orderBy($"UserId",$"AppLauchNr",$"Time")
       .groupBy($"UserId",$"AppLauchNr")
       .agg(
            first($"EventId").as("EventId"),
            first($"EventType").as("EventType"),
            first($"Time").as("Time"),
            first($"GameType", true).as("firstGamePlayed"),
            last($"GameType", true).as("lastGamePlayed")
       )
       .drop($"AppLauchNr")