case类中的计数应该是整数。。。如果你想将结果保留为RDD,我建议使用
reduceByKey
而不是
countByValue
它返回a
Map[String, Long]
而不是RDD。
我也建议分手
,
而不是
,
以避免项目名称中的前导空格。
case class SalesItemSummary(SalesItemDesc: String, SalesItemCount: Int)
val rdd_1 = sc.textFile("Data/SalesItems.csv")
val rdd_2 = rdd_1.flatMap(_.split(", "))
.map((_, 1))
.reduceByKey(_ + _)
.map(line => SalesItemSummary(line._1, line._2))
rdd_2.collect()
// Array[SalesItemSummary] = Array(SalesItemSummary(Gloves,2), SalesItemSummary(Shoes,1), SalesItemSummary(Television,1), SalesItemSummary(Bicycle,1), SalesItemSummary(Helmet,1), SalesItemSummary(Hat,1), SalesItemSummary(Jumper,2), SalesItemSummary(Playstation 5,1))
要对RDD进行排序,您可以使用
sortBy
:
val top3 = rdd_2.sortBy(_.SalesItemCount, false).take(3)
top3
// Array[SalesItemSummary] = Array(SalesItemSummary(Gloves,2), SalesItemSummary(Jumper,2), SalesItemSummary(Shoes,1))