代码之家  ›  专栏  ›  技术社区  ›  Chuang

如何通过另一个小数据帧(逐行)过滤一个大数据帧多次(等于小数据帧的行计数)

  •  -1
  • Chuang  · 技术社区  · 6 年前

    我有两个火花数据框 dfA dfB . 我想过滤 DFA 通过 DFB 每行的,这意味着如果 DFB 有10000行,我需要用10000个不同的过滤条件过滤DFA 10000次 DFB . 然后,在每个筛选之后,我需要将筛选结果作为列收集到 .

    dfA                                    dfB
    +------+---------+---------+           +-----+-------------+--------------+
    |  id  |  value1 |  value2 |           | id  |  min_value1 |  max_value1  |
    +------+---------+---------+           +-----+-------------+--------------+            
    |  1   |    0    |   4345  |           |  1  |     0       |       3      |
    |  1   |    1    |   3434  |           |  1  |     5       |       9      |
    |  1   |    2    |   4676  |           |  2  |     1       |       4      |
    |  1   |    3    |   3454  |           |  2  |     6       |       8      |
    |  1   |    4    |   9765  |           +-----+-------------+--------------+
    |  1   |    5    |   5778  |           ....more rows, nearly 10000 rows.
    |  1   |    6    |   5674  |
    |  1   |    7    |   3456  |
    |  1   |    8    |   6590  |
    |  1   |    9    |   5461  |
    |  1   |    10   |   4656  |
    |  2   |    0    |   2324  |
    |  2   |    1    |   2343  |
    |  2   |    2    |   4946  |
    |  2   |    3    |   4353  |
    |  2   |    4    |   4354  |
    |  2   |    5    |   3234  |
    |  2   |    6    |   8695  |
    |  2   |    7    |   6587  |
    |  2   |    8    |   5688  |
    +------+---------+---------+
    ......more rows,nearly one billons rows
    

    所以我期望的结果是

    resultDF
    +-----+-------------+--------------+----------------------------+
    | id  |  min_value1 |  max_value1  |          results           |
    +-----+-------------+--------------+----------------------------+            
    |  1  |     0       |       3      | [4345,3434,4676,3454]      |
    |  1  |     5       |       9      | [5778,5674,3456,6590,5461] |
    |  2  |     1       |       4      | [2343,4946,4353,4354]      |
    |  2  |     6       |       8      | [8695,6587,5688]           |
    +-----+-------------+--------------+----------------------------+
    

    我愚蠢的解决办法是

    def tempFunction(id:Int,dfA:DataFrame,dfB:DataFrame): DataFrame ={
        val dfa = dfA.filter("id ="+ id)
        val dfb = dfB.filter("id ="+ id)
        val arr = dfb.groupBy("id")
                     .agg(collect_list(struct("min_value1","max_value1"))
                     .collect()
    
        val rangArray = arr(0)(1).asInstanceOf[Seq[Row]]   // get range array of id 
        // initial a resultDF to store each query's results
        val min_value1 = rangArray(0).get(0).asInstanceOf[Int]
        val max_value1 = rangArray(0).get(1).asInstanceOf[Int]
        val s = "value1 between "+min_value1+" and "+ max_value1
        var resultDF = dfa.filter(s).groupBy("id")
                                      .agg(collect_list("value1").as("results"),
                                       min("value1").as("min_value1"),
                                       max("value1").as("max_value1"))
        for( i <-1 to timePairArr.length-1){
           val temp_min_value1 = rangArray(0).get(0).asInstanceOf[Int]
           val temp_max_value1 = rangArray(0).get(1).asInstanceOf[Int]
           val query = "value1 between "+temp_min_value1+" and "+ temp_max_value1
           val tempResultDF = dfa.filter(query).groupBy("id")
                                      .agg(collect_list("value1").as("results"),
                                       min("value1").as("min_value1"),
                                       max("value1").as("max_value1"))
           resultDF = resultDF.union(tempResultDF)
           }
    
      return resultDF
    }
    
    def myFunction():DataFrame = {
      val dfA = spark.read.parquet(routeA)
      val dfB = spark.read.parquet(routeB)
    
      val idArrays = dfB.select("id").distinct().collect()
      // initial result
      var resultDF = tempFunction(idArrays(0).get(0).asInstanceOf[Int],dfA,dfB)
       //tranverse all id 
      for(i<-1 to idArrays.length-1){  
         val tempDF = tempFunction(idArrays(i).get(0).asInstanceOf[Int],dfA,dfB)
         resultDF = resultDF.union(tempDF)
      }
      return resultDF
    }
    

    也许你不想看到我的蛮力代码,我的想法是

     finalResult = null;
     for each id in dfB:
        for query condition of this id:
             tempResult = query dfA 
             union tempResult to finalResult
    

    我试过我的算法,它花了将近50个小时。

    有人有更有效的方法吗?非常感谢。

    2 回复  |  直到 6 年前
        1
  •  1
  •   wandermonk    6 年前

    假设您的DFB是小数据集,我尝试给出以下解决方案。

    尝试使用 Broadcast Join 如下所示

    import org.apache.spark.sql.functions.broadcast
    
    dfA.join(broadcast(dfB), col("dfA.id") === col("dfB.id") && col("dfA.value1") >= col("dfB.min_value1") && col("dfA.value1") <= col("dfB.max_value1")).groupBy(col("dfA.id")).agg(collect_list(struct("value2").as("results"));
    

    BroadcastJoin 就像一个 Map Side Join . 这将把较小的数据具体化为所有映射器。这将通过在减少步骤中省略所需的排序和无序排列阶段来提高性能。

    我希望您避免以下几点:

    从不使用 collect() . 在RDD上发出收集操作时,数据集被复制到驱动程序。

    如果您的数据太大,您可能会得到内存超出界限的异常。

    尝试使用 take() takeSample() 相反。

        2
  •  0
  •   Ramesh Maharjan    6 年前

    很明显 当计算中涉及两个数据帧/数据集时,应执行联接。 . 所以 参加 是你必须的一步。但你什么时候加入是一个重要的问题。

    我建议你 在连接前尽可能聚合和减少数据帧中的行,因为这样可以减少混乱。 .

    当需要精确的dfb时,您只能减少dfa,并且从dfa添加的列满足条件。

    所以你可以 groupBy 身份证件 和聚合 DFA 这样你就能得到一排 身份证件 ,然后可以执行 参加 . 然后你可以使用 udf 用于计算逻辑的函数

    为了清楚和解释,提供了注释。

    import org.apache.spark.sql.functions._
    //udf function to filter only the collected value2 which has value1 within range of min_value1 and max_value1 
    def selectRangedValue2Udf = udf((minValue: Int, maxValue: Int, list: Seq[Row])=> list.filter(row => row.getAs[Int]("value1") <= maxValue && row.getAs[Int]("value1") >= minValue).map(_.getAs[Int]("value2")))
    
    
    dfA.groupBy("id")              //grouping by id
      .agg(collect_list(struct("value1", "value2")).as("collection"))  //collecting all the value1 and value2 as structs
      .join(dfB, Seq("id"), "right")          //joining both dataframes with id
      .select(col("id"), col("min_value1"), col("max_value1"), selectRangedValue2Udf(col("min_value1"), col("max_value1"), col("collection")).as("results"))  //calling the udf function defined above
    

    哪个应该给你

    +---+----------+----------+------------------------------+
    |id |min_value1|max_value1|results                       |
    +---+----------+----------+------------------------------+
    |1  |0         |3         |[4345, 3434, 4676, 3454]      |
    |1  |5         |9         |[5778, 5674, 3456, 6590, 5461]|
    |2  |1         |4         |[2343, 4946, 4353, 4354]      |
    |2  |6         |8         |[8695, 6587, 5688]            |
    +---+----------+----------+------------------------------+
    

    我希望答案有帮助