代码之家  ›  专栏  ›  技术社区  ›  Joshua Howard

火花聚合器无法正常工作

  •  4
  • Joshua Howard  · 技术社区  · 6 年前

    我想在Scala Spark中试用聚合器,但我似乎无法让它们同时使用 select 功能和 groupBy/agg 功能(使用我当前的实现 agg 函数无法编译)。我的聚合器写在下面,应该不言自明。

    import org.apache.spark.sql.expressions.Aggregator
    import org.apache.spark.sql.{Encoder, Encoders}
    
    /** Stores the number of true counts (tc) and false counts (fc) */
    case class Counts(var tc: Long, var fc: Long)
    
    /** Count the number of true and false occurances of a function */
    class BooleanCounter[A](f: A => Boolean) extends Aggregator[A, Counts, Counts] with Serializable {
      // Initialize both counts to zero
      def zero: Counts = Counts(0L, 0L) 
      // Sum counts for intermediate value and new value
      def reduce(acc: Counts, other: A): Counts = { 
        if (f(other)) acc.tc += 1 else acc.fc += 1
        acc 
      }
      // Sum counts for intermediate values
      def merge(acc1: Counts, acc2: Counts): Counts = { 
        acc1.tc += acc2.tc
        acc1.fc += acc2.fc
        acc1
      }
      // Return results
      def finish(acc: Counts): Counts = acc 
      // Encoder for intermediate value type
      def bufferEncoder: Encoder[Counts] = Encoders.product[Counts]
      // Encoder for return type
      def outputEncoder: Encoder[Counts] = Encoders.product[Counts]
    }
    

    下面是我的测试代码。

    val ds: Dataset[Employee] = Seq(
      Employee("John", 110),
      Employee("Paul", 100),
      Employee("George", 0), 
      Employee("Ringo", 80) 
    ).toDS()
    
    val salaryCounter = new BooleanCounter[Employee]((r: Employee) => r.salary < 10).toColumn
    // Usage works fine 
    ds.select(salaryCounter).show()
    // Causes an error
    ds.groupBy($"name").agg(salaryCounter).show()
    

    首次使用 salaryCounter 工作正常,但第二个会导致以下编译错误。

    java.lang.ClassCastException: org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema cannot be cast to Employee 
    

    Databricks有一个 tutorial 这相当复杂,但似乎是Spark 2.3。还有 this 使用Spark 1.6中的实验功能的旧教程。

    1 回复  |  直到 6 年前
        1
  •  4
  •   zero323 little_kid_pea    6 年前

    您错误地混合了“静态类型”和“动态类型”API。要使用以前的版本,应调用 agg 在…上 KeyValueGroupedDataset RelationalGroupedDataset :

    ds.groupByKey(_.name).agg(salaryCounter)