代码之家  ›  专栏  ›  技术社区  ›  Niyaz

Scala:如何将任何泛型序列作为此方法的输入

  •  0
  • Niyaz  · 技术社区  · 5 年前

    我是斯卡拉·努布。还在努力学习语法。

    我试图减少将测试数据转换为数据帧所需编写的代码。以下是我现在拥有的:

      def makeDf[T](seq: Seq[(Int, Int)], colNames: String*): Dataset[Row] = {
        val context = session.sqlContext
        import context.implicits._
        seq.toDF(colNames: _*)
      }
    

    问题在于,上述方法仅采用形状的序列 Seq[(Int, Int)] 作为输入。如何使其接受任何序列作为输入?我可以将输入形状更改为 Seq[AnyRef] ,但代码无法识别 toDF 作为有效符号调用。

    我不知道该怎么做。有什么想法吗?谢谢

    2 回复  |  直到 5 年前
        1
  •  5
  •   Assaf Mendelson    5 年前

    简短答复:

    import scala.reflect.runtime.universe.TypeTag
    
    def makeDf[T <: Product: TypeTag](seq: Seq[T], colNames: String*): DataFrame = ...
    

    说明:

    implicit def localSeqToDatasetHolder[T : Encoder](s: Seq[T]): DatasetHolder[T] = {
      DatasetHolder(_sqlContext.createDataset(s))
    }
    

    这反过来又需要生成一个编码器。问题是编码器只在某些类型上定义。特别是产品(如元组、case类等),您还需要添加类型标签隐式,以便Scala能够克服类型擦除(在运行时,所有序列都具有类型序列,而不管泛型类型如何。类型标签提供了这方面的信息)。

    作为侧节点,您不需要从会话中提取sqlcontext,只需使用:

    import sparkSession.implicits._
    
        2
  •  1
  •   Luis Miguel Mejía Suárez    5 年前

    正如@AssafMendelson已经解释了为什么不能创建 Dataset 属于 Any 是因为 火花 Encoder 转化 物体 从他们 虚拟机 代表 内部 -及 火花 编码器 对于 任何 类型


    然而,依我看,它限制太多,因为它只适用于 Products (元组和case类) -即使这包括了大多数用例,仍然有一些被排除在外。

    编码器 ,您可以将该责任留给客户。在大多数情况下只需要打电话 import spark.implicits._
    因此,我认为这将是最普遍的解决办法。

    import org.apache.spark.sql.{DataFrame, Dataset, Encoder, SparkSession}
    
    // Implicit SparkSession to make the call to further methods more transparent.
    implicit val spark = SparkSession.builder.master("local[*]").getOrCreate()
    import spark.implicits._
    
    def makeDf[T: Encoder](seq: Seq[T], colNames: String*)
                          (implicit spark: SparkSession): DataFrame =
      spark.createDataset(seq).toDF(colNames: _*)
    
    def makeDS[T: Encoder](seq: Seq[T])
                          (implicit spark: SparkSession): Dataset[T] =
      spark.createDataset(seq)
    

    注: 这基本上是从Spark重新创建已经定义的函数。