代码之家  ›  专栏  ›  技术社区  ›  ashok viswanathan

Spark Scala-基于时间段的聚合和旋转

  •  1
  • ashok viswanathan  · 技术社区  · 7 年前

    我试图在spark中实现类似sql server的数据透视

    到目前为止,我正在使用sqlContext并在sql中应用所有转换。 我想知道我是否可以直接从sql server中提取数据,并使用spark实现pivot功能。

    下面是我努力实现的一个例子- 下面的SQL Server查询-

    create table #temp(ID Int, MonthPrior int, Amount float);

    insert into #temp values (100,1,10),(100,2,20),(100,3,30),(100,4,10),(100,5,20),(100,6,60),(200,1,10),(200,2,20),(200,3,30),(300,4,10),(300,5,20),(300,6,60);

    select * from #temp;

    | | 月三日 | 数量 |

    |100 |1 |10|



    |100 |5 |20|
    |100 |6 |60|
    |200 |1 |10|

    |200 |3 |30|

    |300 |5 |20|
    |300 |6 |60|

    Select ID,coalesce([1],0) as Amount1Mth, coalesce([1],0)+coalesce([2],0)+coalesce([3],0) as Amount1to3Mth, coalesce([1],0)+coalesce([2],0)+coalesce([3],0)+coalesce([4],0)+coalesce([5],0)+coalesce([6],0) as Amount_AllMonths from (select * from #temp) A pivot ( sum(Amount) for MonthPrior in ([1],[2],[3],[4],[5],[6]) ) as Pvt

    身份证件 | 数量1 | 数量1到3个 每月金额 |
    |-------|-------|-------|---|
    |100 |10 |60 |150|

    |300 |0 |0 |90|

    3 回复  |  直到 7 年前
        1
  •  2
  •   Leo C    7 年前

    Amount Decimal 类型,最好使用 java.math.BigDecimal 作为相应的参数类型。注意,方法 + sum 不再适用,因此替换为 add reduce

    import org.apache.spark.sql.functions._
    import java.math.BigDecimal
    
    val df = Seq(
      (100, 1, new BigDecimal(10)),
      (100, 2, new BigDecimal(20)),
      (100, 3, new BigDecimal(30)),
      (100, 4, new BigDecimal(10)),
      (100, 5, new BigDecimal(20)),
      (100, 6, new BigDecimal(60)),
      (200, 1, new BigDecimal(10)),
      (200, 2, new BigDecimal(20)),
      (200, 3, new BigDecimal(30)),
      (300, 4, new BigDecimal(10)),
      (300, 5, new BigDecimal(20)),
      (300, 6, new BigDecimal(60))
    ).toDF("ID", "MonthPrior", "Amount")
    
    // UDF to combine 2 array-type columns to map
    def arrayToMap = udf(
      (a: Seq[Int], b: Seq[BigDecimal]) => (a zip b).toMap
    )
    
    // Create array columns which get zipped into a map
    val df2 = df.groupBy("ID").agg(
      collect_list(col("MonthPrior")).as("MonthList"),
      collect_list(col("Amount")).as("AmountList")
    ).select(
      col("ID"), arrayToMap(col("MonthList"), col("AmountList")).as("MthAmtMap")
    )
    
    // UDF to sum map values for keys from 1 thru n (0 for all)
    def sumMapValues = udf(
      (m: Map[Int, BigDecimal], n: Int) =>
        if (n > 0)
          m.collect{ case (k, v) => if (k <= n) v else new BigDecimal(0) }.reduce(_ add _)
        else
          m.collect{ case (k, v) => v }.reduce(_ add _)
    )
    
    val df3 = df2.withColumn( "Amount1Mth", sumMapValues(col("MthAmtMap"), lit(1)) ).
      withColumn( "Amount1to3Mth", sumMapValues(col("MthAmtMap"), lit(3)) ).
      withColumn( "Amount_AllMonths", sumMapValues(col("MthAmtMap"), lit(0)) ).
      select( col("ID"), col("Amount1Mth"), col("Amount1to3Mth"), col("Amount_AllMonths") )
    
    df3.show(truncate=false)
    +---+--------------------+--------------------+--------------------+
    | ID|          Amount1Mth|       Amount1to3Mth|    Amount_AllMonths|
    +---+--------------------+--------------------+--------------------+
    |300|               0E-18|               0E-18|90.00000000000000...|
    |100|10.00000000000000...|60.00000000000000...|150.0000000000000...|
    |200|10.00000000000000...|60.00000000000000...|60.00000000000000...|
    +---+--------------------+--------------------+--------------------+
    
        2
  •  1
  •   Leo C    7 年前

    一种方法是从 MonthPrior Amount UDF

    val df = Seq(
      (100, 1, 10),
      (100, 2, 20),
      (100, 3, 30),
      (100, 4, 10),
      (100, 5, 20),
      (100, 6, 60),
      (200, 1, 10),
      (200, 2, 20),
      (200, 3, 30),
      (300, 4, 10),
      (300, 5, 20),
      (300, 6, 60)
    ).toDF("ID", "MonthPrior", "Amount")
    
    import org.apache.spark.sql.functions._
    
    // UDF to combine 2 array-type columns to map
    def arrayToMap = udf(
      (a: Seq[Int], b: Seq[Int]) => (a zip b).toMap
    )
    
    // Aggregate columns into arrays and apply arrayToMap UDF to create map column
    val df2 = df.groupBy("ID").agg(
      collect_list(col("MonthPrior")).as("MonthList"),
      collect_list(col("Amount")).as("AmountList")
    ).select(
      col("ID"), arrayToMap(col("MonthList"), col("AmountList")).as("MthAmtMap")
    )
    
    // UDF to sum map values for keys from 1 thru n (0 for all)
    def sumMapValues = udf(
      (m: Map[Int, Int], n: Int) =>
        if (n > 0) m.collect{ case (k, v) => if (k <= n) v else 0 }.sum else
          m.collect{ case (k, v) => v }.sum
    )
    
    // Apply sumMapValues UDF to the map column
    val df3 = df2.withColumn( "Amount1Mth", sumMapValues(col("MthAmtMap"), lit(1)) ).
      withColumn( "Amount1to3Mth", sumMapValues(col("MthAmtMap"), lit(3)) ).
      withColumn( "Amount_AllMonths", sumMapValues(col("MthAmtMap"), lit(0)) ).
      select( col("ID"), col("Amount1Mth"), col("Amount1to3Mth"), col("Amount_AllMonths") )
    
    df3.show
    +---+----------+-------------+----------------+
    | ID|Amount1Mth|Amount1to3Mth|Amount_AllMonths|
    +---+----------+-------------+----------------+
    |300|         0|            0|              90|
    |100|        10|           60|             150|
    |200|        10|           60|              60|
    +---+----------+-------------+----------------+
    
        3
  •  0
  •   ashok viswanathan    7 年前

    谢谢@LeoC。上述解决方案有效。我还尝试了以下方法-

    import org.apache.spark.sql.functions._
    import org.apache.spark.sql.Column
    
    
    lazy val months = (((df select ($"MonthPrior") distinct) sort 
    ($"MonthPrior".asc)).rdd map (_.getAs[Int](0)) collect).toList
    
    lazy val sliceSpec = List((0, 2, "1-2"), (0, 3, "1-3"), (0, 4, "1-4"), (0, 5, "1-5"), (0, 6, "1-6"))
    
    lazy val createGroup: List[Any] => ((Int, Int, String) => Column) = sliceMe => (start, finish, aliasName) =>
      sliceMe slice (start, finish) map (value => col(value.toString)) reduce (_ + _) as aliasName
    
    lazy val grouper = createGroup(months).tupled
    
    lazy val groupedCols = sliceSpec map (group => grouper(group))
    
    val pivoted = df groupBy ($"ID") pivot ("MonthPrior") agg (sum($"Amount"))
    
    val writeMe = pivoted select ((pivoted.columns map col) ++ (groupedCols): _*)
    
    z.show(writeMe sort ($"ID".asc))