代码之家  ›  专栏  ›  技术社区  ›  Masterbuilder

按组排列的Spark Topn值

  •  0
  • Masterbuilder  · 技术社区  · 6 年前

    我有一个包含事件详细信息的数据框架,我正在尝试按日期和用户ID获取最近报告的前5个事件。这里是我迄今为止尝试过的代码。

    import org.apache.spark.sql.SparkSession
    import org.apache.spark.sql.expressions.Window
    import org.apache.spark.sql.functions._
    val df = sc.parallelize(Seq( ("20180515114049", "user001","e001","cross-over","some data related to even"),
      ("20180515114049", "user004","e002","cross-limit","some data related to event"),
      ("20180515114049", "user001","e001","cross-over","some data related to event"),
      ("20180615114049", "user001","e001","cross-over","some data related to event"),
      ("20180715114049", "user003","e004","cross-cl","some data related to event"),
      ("20180715114049", "user005","e001","cross-over","some data related to event"),
      ("20180715114049", "user005","e002","cross-limit","some data related to event"),
       ("20180715114049", "user005","e003","no-cross","some data related to event"),
       ("20180715114049", "user005","e004","cross-over","some data related to event"),
       ("20180715114049", "user005","e005","dl-over","some data related to event"),
       ("20180715114049", "user005","e003","no-cross","some data related to event"),
      ("20180815114049", "user006","e001","cross-over","some data related to event"),
      ("20180915114049", "user001","e001","cross-over","some data related to event"),
      ("20180105114049", "user001","e006","straight","some data related to event")
     )).toDF("eventtime", "userid","eventid","event_title","eventdata")
    df.show()
    +--------------+-------+-------+-----------+--------------------+
    |     eventtime| userid|eventid|event_title|           eventdata|
    +--------------+-------+-------+-----------+--------------------+
    |20180515114049|user001|   e001| cross-over|some data related...|
    |20180515114049|user004|   e002|cross-limit|some data related...|
    |20180515114049|user001|   e001| cross-over|some data related...|
    |20180615114049|user001|   e001| cross-over|some data related...|
    |20180715114049|user003|   e004|   cross-cl|some data related...|
    |20180715114049|user005|   e001| cross-over|some data related...|
    |20180715114049|user005|   e002|cross-limit|some data related...|
    |20180715114049|user005|   e003|   no-cross|some data related...|
    |20180715114049|user005|   e004| cross-over|some data related...|
    |20180715114049|user005|   e005|    dl-over|some data related...|
    |20180715114049|user005|   e003|   no-cross|some data related...|
    |20180815114049|user006|   e001| cross-over|some data related...|
    |20180915114049|user001|   e001| cross-over|some data related...|
    |20180105114049|user001|   e006|   straight|some data related...|
    +--------------+-------+-------+-----------+--------------------+
    val df2= df.groupBy($"userid",$"eventid").agg(last($"eventtime") as "lasteventtime")
    df2.show(false)
    +-------+-------+--------------+
    |userid |eventid|lasteventtime |
    +-------+-------+--------------+
    |user005|e004   |20180715114049|
    |user005|e001   |20180715114049|
    |user001|e006   |20180105114049|
    |user001|e001   |20180915114049|
    |user005|e002   |20180715114049|
    |user006|e001   |20180815114049|
    |user004|e002   |20180515114049|
    |user005|e005   |20180715114049|
    |user005|e003   |20180715114049|
    |user003|e004   |20180715114049|
    +-------+-------+--------------+
    

    … 下面是我正在努力联系的部分,将最后一个报告的组汇总到排名中,并获得最后一个报告的前5名。 …

    val w = Window.partitionBy($"userid",$"event_title",$"eventid").orderBy($"eventtime".desc)
    val contentByRank = df.withColumn("rank", dense_rank().over(w)).filter($"rank" <= 5)
    contentByRank.show(20,false)
    

    另外,如何获得过滤后排名的前5位,在这种情况下,我们可能有多个事件具有相同的排名。

    +--------------+-------+-------+-----------+--------------------------+----+
    |eventtime     |userid |eventid|event_title|eventdata                 |rank|
    +--------------+-------+-------+-----------+--------------------------+----+
    |20180515114049|user004|e002   |cross-limit|some data related to event|1   |
    |20180715114049|user005|e004   |cross-over |some data related to event|1   |
    |20180815114049|user006|e001   |cross-over |some data related to event|1   |
    |20180715114049|user005|e003   |no-cross   |some data related to event|1   |
    |20180715114049|user005|e003   |no-cross   |some data related to event|1   |
    |20180715114049|user005|e005   |dl-over    |some data related to event|1   |
    |20180715114049|user003|e004   |cross-cl   |some data related to event|1   |
    |20180715114049|user005|e001   |cross-over |some data related to event|1   |
    |20180105114049|user001|e006   |straight   |some data related to event|1   |
    |20180715114049|user005|e002   |cross-limit|some data related to event|1   |
    |20180915114049|user001|e001   |cross-over |some data related to event|1   |
    |20180615114049|user001|e001   |cross-over |some data related to event|2   |
    |20180515114049|user001|e001   |cross-over |some data related to even |3   |
    |20180515114049|user001|e001   |cross-over |some data related to event|3   |
    +--------------+-------+-------+-----------+--------------------------+----+
    
    1 回复  |  直到 6 年前
        1
  •  0
  •   Masterbuilder    6 年前

         val df2= df.groupBy($"userid",$"eventid").agg(last($"eventtime") as "eventtime")
         val lasteventdf=df.join(df2,Seq("eventtime", "userid","eventid"))       
         val w = Window.partitionBy($"userid",$"event_title",$"eventid").orderBy($"eventtime".desc)
         val contentByRank = lasteventdf.withColumn("rank", dense_rank().over(w)).filter($"rank" <= 5)
         contentByRank.show(20,false)
    
    --------------+-------+-------+-----------+----------------------------+----+
    |eventtime     |userid |eventid|event_title|eventdata                   |rank|
    +--------------+-------+-------+-----------+----------------------------+----+
    |20180515114049|user004|e002   |cross-limit|some data related to event  |1   |
    |20180715114049|user005|e004   |cross-over |some data relat7ed to event |1   |
    |20180815114049|user006|e001   |cross-over |some data re22lated to event|1   |
    |20180715114049|user005|e003   |no-cross   |some data relate6d to event |1   |
    |20180715114049|user005|e003   |no-cross   |some data rel9ated to event |1   |
    |20180715114049|user005|e005   |dl-over    |some data relat8ed to event |1   |
    |20180715114049|user003|e004   |cross-cl   |some data related2 to event |1   |
    |20180715114049|user005|e001   |cross-over |some data related4 to event |1   |
    |20180105114049|user001|e006   |straight   |some data relat4ed to event |1   |
    |20180715114049|user005|e002   |cross-limit|some data related5 to event |1   |
    |20180915114049|user001|e001   |cross-over |some data rel3ated to event |1   |
    +--------------+-------+-------+-----------+----------------------------+----+