代码之家  ›  专栏  ›  技术社区  ›  Anneso

在scala 2中使用Window计算if条件的行数

  •  0
  • Anneso  · 技术社区  · 6 年前

    我已经发布了一个类似的问题,但是有人给了我一个避免使用“if条件”的技巧。

    我有一个数据帧。

    var df = sc.parallelize(Array(
    (1,  "2017-06-29 10:53:53.0","2017-06-25 14:60:53.0","boulanger.fr"), 
    (2,  "2017-07-05 10:48:57.0","2017-09-05 08:60:53.0","patissier.fr"), 
    (3,  "2017-06-28 10:31:42.0","2017-02-28 20:31:42.0","boulanger.fr"), 
    (4,  "2017-08-21 17:31:12.0","2017-10-21 10:29:12.0","patissier.fr"), 
    (5,  "2017-07-28 11:22:42.0","2017-05-28 11:22:42.0","boulanger.fr"), 
    (6,  "2017-08-23 17:03:43.0","2017-07-23 09:03:43.0","patissier.fr"), 
    (7,  "2017-08-24 16:08:07.0","2017-08-22 16:08:07.0","boulanger.fr"), 
    (8,  "2017-08-31 17:20:43.0","2017-05-22 17:05:43.0","patissier.fr"), 
    (9,  "2017-09-04 14:35:38.0","2017-07-04 07:30:25.0","boulanger.fr"), 
    (10, "2017-09-07 15:10:34.0","2017-07-29 12:10:34.0","patissier.fr"))).toDF("id", "date1","date2", "mail")
    
    df = df.withColumn("date1", (unix_timestamp($"date1", "yyyy-MM-dd HH:mm:ss").cast("timestamp")))
    df = df.withColumn("date2", (unix_timestamp($"date2", "yyyy-MM-dd HH:mm:ss").cast("timestamp")))
    
    df = df.orderBy("date1", "date2")
    

    +---+---------------------+---------------------+------------+
    |id |date1                |date2                |mail        |
    +---+---------------------+---------------------+------------+
    |3  |2017-06-28 10:31:42.0|2017-02-28 20:31:42.0|boulanger.fr|
    |1  |2017-06-29 10:53:53.0|2017-06-25 15:00:53.0|boulanger.fr|
    |2  |2017-07-05 10:48:57.0|2017-09-05 09:00:53.0|patissier.fr|
    |5  |2017-07-28 11:22:42.0|2017-05-28 11:22:42.0|boulanger.fr|
    |4  |2017-08-21 17:31:12.0|2017-10-21 10:29:12.0|patissier.fr|
    |6  |2017-08-23 17:03:43.0|2017-07-23 09:03:43.0|patissier.fr|
    |7  |2017-08-24 16:08:07.0|2017-08-22 16:08:07.0|boulanger.fr|
    |8  |2017-08-31 17:20:43.0|2017-05-22 17:05:43.0|patissier.fr|
    |9  |2017-09-04 14:35:38.0|2017-07-04 07:30:25.0|boulanger.fr|
    |10 |2017-09-07 15:10:34.0|2017-07-29 12:10:34.0|patissier.fr|
    +---+---------------------+---------------------+------------+
    

    对于每个id,我要在所有其他行中计算行数:

    1. 日期1在[我的当前日期1-60天,我的当前日期1-1天]
    2. 日期2<我的当前日期1
    3. 与我当前的邮件相同的邮件

    如果我查看第5行,我想返回行数:

    1. 日期2<2017-07-28 11:22:42.0
    2. 邮件=boulanger.fr

    所以我想做些类似的事情:

    val w = Window.partitionBy("mail").orderBy(col("date1").cast("long")).rangeBetween(-60*24*60*60,-1*24*60*60)
    var df= df.withColumn("all_previous", count("mail") over w)
    

    但这将响应条件1和条件3,而不是第二个。。。我必须添加一些内容来包含第二个比较日期2和我的日期1的条件。。。

    1 回复  |  直到 6 年前
        1
  •  1
  •   Leo C    6 年前

    使用通用窗口规范 last(date1) date1 每个窗口分区和 sum

    import org.apache.spark.sql.functions._
    import org.apache.spark.sql.expressions.Window
    
    def days(n: Long): Long = n * 24 * 60 * 60
    
    val w = Window.partitionBy("mail").orderBy($"date1".cast("long"))
    val w1 = w.rangeBetween(days(-60), days(0))
    val w2 = w.rangeBetween(days(-60), days(-1))
    
    df.withColumn("all_previous", sum(
          when($"date2".cast("long") < last($"date1").over(w1).cast("long"), 1).
            otherwise(0)
        ).over(w2)
      ).na.fill(0).
      show
    // +---+-------------------+-------------------+------------+------------+
    // | id|              date1|              date2|        mail|all_previous|
    // +---+-------------------+-------------------+------------+------------+
    // |  3|2017-06-28 10:31:42|2017-02-28 20:31:42|boulanger.fr|           0|
    // |  1|2017-06-29 10:53:53|2017-06-25 15:00:53|boulanger.fr|           1|
    // |  5|2017-07-28 11:22:42|2017-05-28 11:22:42|boulanger.fr|           2|
    // |  7|2017-08-24 16:08:07|2017-08-22 16:08:07|boulanger.fr|           3|
    // |  9|2017-09-04 14:35:38|2017-07-04 07:30:25|boulanger.fr|           2|
    // |  2|2017-07-05 10:48:57|2017-09-05 09:00:53|patissier.fr|           0|
    // |  4|2017-08-21 17:31:12|2017-10-21 10:29:12|patissier.fr|           0|
    // |  6|2017-08-23 17:03:43|2017-07-23 09:03:43|patissier.fr|           0|
    // |  8|2017-08-31 17:20:43|2017-05-22 17:05:43|patissier.fr|           1|
    // | 10|2017-09-07 15:10:34|2017-07-29 12:10:34|patissier.fr|           2|
    // +---+-------------------+-------------------+------------+------------+
    

    [更新]

    此解决方案是不正确的,即使示例数据集的结果似乎是正确的。特别地, last($"date1").over(w1)