代码之家  ›  专栏  ›  技术社区  ›  Kalpesh

如何将GroupedDataset保存到parquet或将其转换为toDF

  •  1
  • Kalpesh  · 技术社区  · 7 年前

    我正在使用spark 1.6.1。

    是否有任何API可用于将GroupDataset保存到拼花文件。

    E、 我有一个自定义对象“Procedure”,我已经将数据帧转换为Procedure对象。 之后,我在patientID上进行分组。 我想将DDS分组到拼花文件中,或者将其作为数据帧传递给其他函数。 我没有为存储获取任何API,也没有将其转换为数据帧。

    val procedureDs: Dataset[Procedure] = joinDf.select("patientid", "patientprocedureid", "procedurecode").as[Procedure]
    val groupedDs:GroupedDataset[Long, Procedure] = procedureDs.groupBy{ x => x.patientid } 
    

    应用地图组后

    val a = groupedDs.mapGroups{ case (k,vs) => { (k, vs.toSeq)}}
    

    它给出了以下错误:

    Exception in thread "main" java.lang.UnsupportedOperationException: No Encoder found for com.....PatientDiagnosis
    - array element class: "com....PatientDiagnosis"
    - field (class: "scala.collection.Seq", name: "_2")
    - root class: "scala.Tuple2"
    

    我试图给出明确的编码器

    val a = groupedDigDs.mapGroups((k,vs) =>  (k, vs.toSeq))(org.apache.spark.sql.Encoders.bean(classOf[(Long, Seq[com....PatientDiagnosis])]))
    

    然后错误更改为:

     java.lang.UnsupportedOperationException: Cannot infer type for class scala.Tuple2 because it is not bean-compliant
    
    1 回复  |  直到 7 年前
        1
  •  2
  •   zero323 little_kid_pea    7 年前

    等同于 GroupedData ( RelationalGroupedDataset 在Spark 2中。x) , GroupedDataset ( KeyValueGroupedDataset 在Spark 2中。x) 必须在保存之前进行聚合。

    如果你的目标是另一个 groupByKey 您可以使用 mapGroups :

    val groupedDs: GroupedDataset[K, V] = ???
    // ... { case (k, xs) => (k, xs.toSeq) }  to preserve key as well
    groupedDs.mapGroups { case (_, xs) => xs.toSeq }