代码之家  ›  专栏  ›  技术社区  ›  JetS79

在Spark中对RDD进行排序

  •  0
  • JetS79  · 技术社区  · 3 年前

    我有一个数据集,列出了客户购买的一般商品。csv中的每条记录从左到右列出了客户购买的物品。例如(缩短样本):

    Bicycle, Helmet, Gloves
    Shoes, Jumper, Gloves
    Television, Hat, Jumper, Playstation 5
    

    我想把它放在scala的RDD中,并对它们进行计数。

    case class SalesItemSummary(SalesItemDesc: String, SalesItemCount: String)
    val rdd_1 = sc.textFile("Data/SalesItems.csv")
    val rdd_2 = rdd_1.flatMap(line => line.split(",")).countByValue();
    

    上面是一个简短的代码示例。第一行是case类(尚未使用)。 第二行从csv中获取数据并将其放入rdd_1中。很简单。 第三行执行平面映射,在逗号上分割数据,然后对每个数据进行计数。例如,上面的“手套”和“跳跃者”旁边会有数字2。其他的是1。看起来像是元组的集合。 到现在为止,一直都还不错。

    接下来,我想对rdd_2进行排序,列出购买量最大的前3个项目。 我可以用RDD做这个吗?或者我需要将RDD转换为数据帧来实现排序吗? 如果是这样,我该怎么做?

    例如,我如何将第1行中的case类应用于rdd_2,它似乎是一个元组列表?我应该采取这种方法吗?

    提前感谢

    1 回复  |  直到 3 年前
        1
  •  3
  •   mck    3 年前

    case类中的计数应该是整数。。。如果你想将结果保留为RDD,我建议使用 reduceByKey 而不是 countByValue 它返回a Map[String, Long] 而不是RDD。

    我也建议分手 , 而不是 , 以避免项目名称中的前导空格。

    case class SalesItemSummary(SalesItemDesc: String, SalesItemCount: Int)
    
    val rdd_1 = sc.textFile("Data/SalesItems.csv")
    
    val rdd_2 = rdd_1.flatMap(_.split(", "))
                     .map((_, 1))
                     .reduceByKey(_ + _)
                     .map(line => SalesItemSummary(line._1, line._2))
    
    rdd_2.collect()
    // Array[SalesItemSummary] = Array(SalesItemSummary(Gloves,2), SalesItemSummary(Shoes,1), SalesItemSummary(Television,1), SalesItemSummary(Bicycle,1), SalesItemSummary(Helmet,1), SalesItemSummary(Hat,1), SalesItemSummary(Jumper,2), SalesItemSummary(Playstation 5,1))
    

    要对RDD进行排序,您可以使用 sortBy :

    val top3 = rdd_2.sortBy(_.SalesItemCount, false).take(3)
    
    top3
    // Array[SalesItemSummary] = Array(SalesItemSummary(Gloves,2), SalesItemSummary(Jumper,2), SalesItemSummary(Shoes,1))