代码之家  ›  专栏  ›  技术社区  ›  user4398985

通过维护订单聚合重复记录,还包括重复记录

  •  2
  • user4398985  · 技术社区  · 6 年前

    我正在尝试解决一个有趣的问题,只需对sum、count等聚合进行分组即可,但这个问题略有不同。让我解释一下:

    这是我的元组列表:

    val repeatSmokers: List[(String, String, String, String, String, String)] =
      List(
        ("ID76182", "sachin", "kita MR.", "56308", "1990", "300"),
        ("ID76182", "KOUN", "Jana MR.", "56714", "1990", "100"),
        ("ID76182", "GANGS", "SKILL", "27539", "1990", "255"),
        ("ID76182", "GANGS", "SKILL", "27539", "1990", "110"),
        ("ID76182", "SEMI", "GAUTAM A MR.", "45873", "1990", "20"),
        ("ID76182", "SEMI", "GAUTAM A MR.", "45873", "1990", "6750"),
        ("ID76182", "DOWNES", "RYAN", "47542", "1990", "2090"),
        ("ID76182", "DRAGON", "WARS", "49337", "1990", "200"),
        ("ID76182", "HULK", "PAIN MR.", "47542", "1990", "280"),
        ("ID76182", "JAMES", "JIM", "30548", "1990", "300"),
        ("ID76182", "KIMMELSHUE", "RUTH", "55345", "1990", "2600"),
        ("ID76182", "DRAGON", "WARS", "49337", "1990", "370"),
        ("ID76182", "COOPER", "ANADA", "45873", "1990", "2600"),
        ("ID76182", "SEMI", "GAUTAM A MR.", "45873", "1990", "2600"),
        ("ID76182", "HULK", "PAIN MR.", "47542", "1990", "256")
      )
    

    这些记录的架构为 (Idnumber, name, test_code, year, amount) . 从这些元素中,我只需要重复记录,我们在上面的列表中定义唯一组合的方式是 (sachin, kita MR.,56308) 名称和test\u代码组合。这意味着如果相同的名称和test\u代码重复,则为重复吸烟者记录。为了简单起见,您可以假设只有test\u代码是唯一值,如果它重复,您可以说它是一个重复的smoker记录。

    以下是确切的输出:

    ID76182,27539,1990,255,1 
    ID76182,27539,1990,365,2
    ID76182,45873,1990,20,1 
    ID76182,45873,1990,6770,2 
    ID76182,45873,1990,9370,3
    ID76182,49337,1990,200,1
    ID76182,49337,1990,570,2
    ID76182,47542,1990,280,1
    ID76182,47542,1990,536,2
    

    最后,这里最具挑战性的部分是保持每秒钟重复吸烟者记录的顺序和总和,并添加发生次数。

    例如:此记录架构为:ID76182475421990536,2

    ID编号、测试代码、年份、金额、发生次数

    由于发生了两次,我们看到上面的2。

    注:

    输出可以是任何集合的列表,但格式应与我上面提到的相同

    3 回复  |  直到 6 年前
        1
  •  2
  •   SergGr    6 年前

    下面是Scala中的一些代码,但实际上是用Scala编写的Java代码:

    import java.util.ArrayList
    import java.util.LinkedHashMap
    import scala.collection.convert._
    
    
    type RawRecord = (String, String, String, String, String, String)
    type Record = (String, String, String, String, Int, Int)
    type RecordKey = (String, String, String, String)
    type Output = (String, String, String, String, Int, Int, Int)
    val keyF: Record => RecordKey = r => (r._1, r._2, r._3, r._4)
    val repeatSmokersRaw: List[RawRecord] =
      List(
        ("ID76182", "sachin", "kita MR.", "56308", "1990", "300"),
        ("ID76182", "KOUN", "Jana MR.", "56714", "1990", "100"),
        ("ID76182", "GANGS", "SKILL", "27539", "1990", "255"),
        ("ID76182", "GANGS", "SKILL", "27539", "1990", "110"),
        ("ID76182", "SEMI", "GAUTAM A MR.", "45873", "1990", "20"),
        ("ID76182", "SEMI", "GAUTAM A MR.", "45873", "1990", "6750"),
        ("ID76182", "DOWNES", "RYAN", "47542", "1990", "2090"),
        ("ID76182", "DRAGON", "WARS", "49337", "1990", "200"),
        ("ID76182", "HULK", "PAIN MR.", "47542", "1990", "280"),
        ("ID76182", "JAMES", "JIM", "30548", "1990", "300"),
        ("ID76182", "KIMMELSHUE", "RUTH", "55345", "1990", "2600"),
        ("ID76182", "DRAGON", "WARS", "49337", "1990", "370"),
        ("ID76182", "COOPER", "ANADA", "45873", "1990", "2600"),
        ("ID76182", "SEMI", "GAUTAM A MR.", "45873", "1990", "2600"),
        ("ID76182", "HULK", "PAIN MR.", "47542", "1990", "256")
      )
    val repeatSmokers = repeatSmokersRaw.map(r => (r._1, r._2, r._3, r._4, r._5.toInt, r._6.toInt))
    
    val acc = new LinkedHashMap[RecordKey, (util.ArrayList[Output], Int, Int)]
    repeatSmokers.foreach(r => {
      val key = keyF(r)
      var cur = acc.get(key)
      if (cur == null) {
        cur = (new ArrayList[Output](), 0, 0)
      }
      val nextCnt = cur._2 + 1
      val sum = cur._3 + r._6
      val output = (r._1, r._2, r._3, r._4, r._5, sum, nextCnt)
      cur._1.add(output)
      acc.put(key, (cur._1, nextCnt, sum))
    })
    val result = acc.values().asScala.filter(p => p._2 > 1).flatMap(p => p._1.asScala)
    // or if you are clever you can merge filter and flatMap as
    // val result = acc.values().asScala.flatMap(p => if (p._1.size > 1) p._1.asScala else Nil)
    
    println(result.mkString("\n"))
    

    它打印

    (ID76182,帮派,技能,275391990255,1)
    (ID76182,帮派,技能,275391990365,2)
    (ID76182,SEMI,GAUTAM A先生,458731990,20,1)
    (ID76182,SEMI,GAUTAM A先生,4587319906770,2)
    (ID76182,SEMI,GAUTAM A先生,4587319909370,3)
    (ID76182,《龙,战争》,493371990200,1)
    (ID76182,《龙,战争》,493371990570,2)
    (ID76182,HULK,PAIN先生,475421990280,1)
    (ID76182,HULK,PAIN先生,475421990536,2)

    这段代码中的主要技巧是使用Java的 LinkedHashMap 作为累加器集合,因为它保留插入顺序。另一个窍门是在其中存储一些列表(因为我决定使用Java集合 ArrayList 对于内部蓄能器,但您可以使用任何您喜欢的)。因此,我们的想法是构建一个键=>吸烟者列表,以及每个密钥存储的当前计数器和当前总和,以便可以将“聚合”吸烟者添加到列表中。构建映射时,通过它筛选出那些没有累积至少2条记录的键,然后将列表映射转换为单个列表(这一点很重要 LinkedHashMap 是因为在迭代过程中保留了插入顺序)

        2
  •  2
  •   Xavier Guihot    6 年前

    以下是解决此问题的功能方法:

    对于此输入:

    val repeatSmokers: List[(String, String, String, String, String, String)] =
      List(
        ("ID76182", "sachin", "kita MR.", "56308", "1990", "300"),
        ("ID76182", "KOUN", "Jana MR.", "56714", "1990", "100"),
        ("ID76182", "GANGS", "SKILL", "27539", "1990", "255"),
        ("ID76182", "GANGS", "SKILL", "27539", "1990", "110"),
        ("ID76182", "SEMI", "GAUTAM A MR.", "45873", "1990", "20"),
        ("ID76182", "SEMI", "GAUTAM A MR.", "45873", "1990", "6750"),
        ("ID76182", "DOWNES", "RYAN", "47542", "1990", "2090"),
        ("ID76182", "DRAGON", "WARS", "49337", "1990", "200"),
        ("ID76182", "HULK", "PAIN MR.", "47542", "1990", "280"),
        ("ID76182", "JAMES", "JIM", "30548", "1990", "300"),
        ("ID76182", "KIMMELSHUE", "RUTH", "55345", "1990", "2600"),
        ("ID76182", "DRAGON", "WARS", "49337", "1990", "370"),
        ("ID76182", "COOPER", "ANADA", "45873", "1990", "2600"),
        ("ID76182", "SEMI", "GAUTAM A MR.", "45873", "1990", "2600"),
        ("ID76182", "HULK", "PAIN MR.", "47542", "1990", "256")
      )
    

    使用代表记录的case类:

    case class Record(
        id: String,
        fname: String,
        lname: String,
        code: String,
        year: String,
        amount: String)
    

    我们可以运行以下操作:

    val result = repeatSmokers
      .map(recordTuple => Record.tupled(recordTuple))
      .zipWithIndex
      .groupBy { case (record, order) => (record.fname, record.lname, record.code) }
      .flatMap {
    
        case (_, List(singleRecord)) => Nil // get rid of non-repeat records
    
        case (key, records) => {
    
          val firstKeyIdx = records.head._2
    
          val amounts = records.map {
            case (record, order) => record.amount.toInt
          }.foldLeft(List[Int]()) {
            case (Nil, addAmount) => List(addAmount)
            case (previousAmounts :+ lastAmount, addAmount) =>
              previousAmounts :+ lastAmount :+ (lastAmount + addAmount)
          }
    
          records
            .zip(amounts)
            .zipWithIndex
            .map {
              case (((rec, order), amount), idx) =>
                val serializedRecord =
                  List(rec.id, rec.code, rec.year, amount, idx + 1)
                (serializedRecord.mkString(","), (firstKeyIdx, idx))
            }
        }
      }
      .toList
      .sortBy { case (serializedRecord, finalOrder) => finalOrder }
      .map { case (serializedRecord, finalOrder) => serializedRecord }
    

    这将产生:

    ID76182,27539,1990,255,1
    ID76182,27539,1990,365,2
    ID76182,45873,1990,20,1
    ID76182,45873,1990,6770,2
    ID76182,45873,1990,9370,3
    ID76182,49337,1990,200,1
    ID76182,49337,1990,570,2
    ID76182,47542,1990,280,1
    ID76182,47542,1990,536,2
    

    一些解释:

    从元组实例化case类的一种非常好的方法(从元组列表创建记录列表):

    .map(recordTuple => Record.tupled(recordTuple))
    

    每个记录都使用其全局索引(记录,索引)进行元组化,以便能够使用排序:

    .zipWithIndex
    

    然后,我们使用您所需的密钥分组:

    .groupBy { case (record, order) => (record.fname, record.lname, record.code) }
    

    然后,对于分组阶段产生的每个键/值,我们将输出一个记录列表(如果值是单个记录,则输出一个空列表)。因此,平面图可以将要生成的列表展平。

    以下是摆脱单个记录的部分:

    case (_, List(singleRecord)) => Nil
    

    另一种情况涉及累计金额的创建(这是一个Int列表)(Spark开发人员注意:groupBy保留了给定键中值元素的顺序):

    val amounts = records.map {
        case (record, order) => record.amount.toInt
      }.foldLeft(List[Int]()) {
        case (Nil, addAmount) => List(addAmount)
        case (previousAmounts :+ lastAmount, addAmount) =>
          previousAmounts :+ lastAmount :+ (lastAmount + addAmount)
      }
    

    这些金额被压缩回记录,以便使用给定的累计金额修改每个记录金额。此外,还可以将记录序列化为最终所需的格式:

    records
        .zip(amounts)
        .zipWithIndex
        .map {
          case (((rec, order), amount), idx) =>
            val serializedRecord =
              List(rec.id, rec.code, rec.year, amount, idx + 1).mkString(
                ",")
            (serializedRecord, (firstKeyIdx, idx))
        }
    

    上一部分还使用索引压缩记录。事实上,每个序列化记录都提供了一个元组(firstKeyIdx,idx),该元组用于根据需要对每个记录进行排序(首先是键的幻影顺序(firstKeyIdx),然后对于来自同一键的记录,“嵌套”顺序由idx定义):

    .sortBy { case (serializedRecord, finalOrder) => finalOrder }
    
        3
  •  1
  •   Xavier Guihot    6 年前

    下面是一种基于@SergGr的解决方案的函数/递归方法,该解决方案正确地引入了LinkedHashMap。

    给定此输入:

    val repeatSmokers: List[(String, String, String, String, String, Int)] =
      List(
        ("ID76182", "sachin", "kita MR.", "56308", "1990", 300),
        ("ID76182", "KOUN", "Jana MR.", "56714", "1990", 100),
        ("ID76182", "GANGS", "SKILL", "27539", "1990", 255),
        ("ID76182", "GANGS", "SKILL", "27539", "1990", 110),
        ("ID76182", "SEMI", "GAUTAM A MR.", "45873", "1990", 20),
        ("ID76182", "SEMI", "GAUTAM A MR.", "45873", "1990", 6750),
        ("ID76182", "DOWNES", "RYAN", "47542", "1990", 2090),
        ("ID76182", "DRAGON", "WARS", "49337", "1990", 200),
        ("ID76182", "HULK", "PAIN MR.", "47542", "1990", 280),
        ("ID76182", "JAMES", "JIM", "30548", "1990", 300),
        ("ID76182", "KIMMELSHUE", "RUTH", "55345", "1990", 2600),
        ("ID76182", "DRAGON", "WARS", "49337", "1990", 370),
        ("ID76182", "COOPER", "ANADA", "45873", "1990", 2600),
        ("ID76182", "SEMI", "GAUTAM A MR.", "45873", "1990", 2600),
        ("ID76182", "HULK", "PAIN MR.", "47542", "1990", 256)
      )
    

    通过以下方式首先准备和聚合数据:

    case class Record(
      id: String, fname: String, lname: String,
      code: String, year: String, var amount: Int
    )
    
    case class Key(fname: String, lname: String, code: String)
    
    val preparedRecords: List[(Key, Record)] = repeatSmokers.map {
      case recordTuple @ (_, fname, lname, code, _, _) =>
        (Key(fname, lname, code), Record.tupled(recordTuple))
    }
    

    .

    import scala.collection.mutable.LinkedHashMap
    
    def aggregateDuplicatesWithOrder(
        remainingRecords: List[(Key, Record)],
        processedRecords: LinkedHashMap[Key, List[Record]]
    ): LinkedHashMap[Key, List[Record]] =
      remainingRecords match {
    
        case (key, record) :: newRemainingRecords => {
    
          processedRecords.get(key) match {
            case Some(recordList :+ lastRecord) => {
              record.amount = record.amount + lastRecord.amount
              processedRecords.update(key, recordList :+ lastRecord :+ record)
            }
            case None => processedRecords(key) = List(record)
          }
    
          aggregateDuplicatesWithOrder(newRemainingRecords, processedRecords)
        }
    
        case Nil => processedRecords
      }
    
    val result = aggregateDuplicatesWithOrder(
      preparedRecords, LinkedHashMap[Key, List[Record]]()
    ).values.flatMap {
      case _ :: Nil => Nil
      case records =>
        records.zipWithIndex.map { case (rec, idx) =>
          List(rec.id, rec.code, rec.year, rec.amount, idx + 1).mkString(",")
        }
    }