以下是解决此问题的功能方法:
对于此输入:
val repeatSmokers: List[(String, String, String, String, String, String)] =
List(
("ID76182", "sachin", "kita MR.", "56308", "1990", "300"),
("ID76182", "KOUN", "Jana MR.", "56714", "1990", "100"),
("ID76182", "GANGS", "SKILL", "27539", "1990", "255"),
("ID76182", "GANGS", "SKILL", "27539", "1990", "110"),
("ID76182", "SEMI", "GAUTAM A MR.", "45873", "1990", "20"),
("ID76182", "SEMI", "GAUTAM A MR.", "45873", "1990", "6750"),
("ID76182", "DOWNES", "RYAN", "47542", "1990", "2090"),
("ID76182", "DRAGON", "WARS", "49337", "1990", "200"),
("ID76182", "HULK", "PAIN MR.", "47542", "1990", "280"),
("ID76182", "JAMES", "JIM", "30548", "1990", "300"),
("ID76182", "KIMMELSHUE", "RUTH", "55345", "1990", "2600"),
("ID76182", "DRAGON", "WARS", "49337", "1990", "370"),
("ID76182", "COOPER", "ANADA", "45873", "1990", "2600"),
("ID76182", "SEMI", "GAUTAM A MR.", "45873", "1990", "2600"),
("ID76182", "HULK", "PAIN MR.", "47542", "1990", "256")
)
使用代表记录的case类:
case class Record(
id: String,
fname: String,
lname: String,
code: String,
year: String,
amount: String)
我们可以运行以下操作:
val result = repeatSmokers
.map(recordTuple => Record.tupled(recordTuple))
.zipWithIndex
.groupBy { case (record, order) => (record.fname, record.lname, record.code) }
.flatMap {
case (_, List(singleRecord)) => Nil // get rid of non-repeat records
case (key, records) => {
val firstKeyIdx = records.head._2
val amounts = records.map {
case (record, order) => record.amount.toInt
}.foldLeft(List[Int]()) {
case (Nil, addAmount) => List(addAmount)
case (previousAmounts :+ lastAmount, addAmount) =>
previousAmounts :+ lastAmount :+ (lastAmount + addAmount)
}
records
.zip(amounts)
.zipWithIndex
.map {
case (((rec, order), amount), idx) =>
val serializedRecord =
List(rec.id, rec.code, rec.year, amount, idx + 1)
(serializedRecord.mkString(","), (firstKeyIdx, idx))
}
}
}
.toList
.sortBy { case (serializedRecord, finalOrder) => finalOrder }
.map { case (serializedRecord, finalOrder) => serializedRecord }
这将产生:
ID76182,27539,1990,255,1
ID76182,27539,1990,365,2
ID76182,45873,1990,20,1
ID76182,45873,1990,6770,2
ID76182,45873,1990,9370,3
ID76182,49337,1990,200,1
ID76182,49337,1990,570,2
ID76182,47542,1990,280,1
ID76182,47542,1990,536,2
一些解释:
从元组实例化case类的一种非常好的方法(从元组列表创建记录列表):
.map(recordTuple => Record.tupled(recordTuple))
每个记录都使用其全局索引(记录,索引)进行元组化,以便能够使用排序:
.zipWithIndex
然后,我们使用您所需的密钥分组:
.groupBy { case (record, order) => (record.fname, record.lname, record.code) }
然后,对于分组阶段产生的每个键/值,我们将输出一个记录列表(如果值是单个记录,则输出一个空列表)。因此,平面图可以将要生成的列表展平。
以下是摆脱单个记录的部分:
case (_, List(singleRecord)) => Nil
另一种情况涉及累计金额的创建(这是一个Int列表)(Spark开发人员注意:groupBy保留了给定键中值元素的顺序):
val amounts = records.map {
case (record, order) => record.amount.toInt
}.foldLeft(List[Int]()) {
case (Nil, addAmount) => List(addAmount)
case (previousAmounts :+ lastAmount, addAmount) =>
previousAmounts :+ lastAmount :+ (lastAmount + addAmount)
}
这些金额被压缩回记录,以便使用给定的累计金额修改每个记录金额。此外,还可以将记录序列化为最终所需的格式:
records
.zip(amounts)
.zipWithIndex
.map {
case (((rec, order), amount), idx) =>
val serializedRecord =
List(rec.id, rec.code, rec.year, amount, idx + 1).mkString(
",")
(serializedRecord, (firstKeyIdx, idx))
}
上一部分还使用索引压缩记录。事实上,每个序列化记录都提供了一个元组(firstKeyIdx,idx),该元组用于根据需要对每个记录进行排序(首先是键的幻影顺序(firstKeyIdx),然后对于来自同一键的记录,“嵌套”顺序由idx定义):
.sortBy { case (serializedRecord, finalOrder) => finalOrder }