代码之家  ›  专栏  ›  技术社区  ›  Théo Bontempelli

导入Spark中的隐式项无效

  •  1
  • Théo Bontempelli  · 技术社区  · 7 年前

    我试图创建一个特性,将数据从配置单元表加载到类型化数据集。代码如下:

    import org.apache.spark.sql.{Dataset, Row, SparkSession}
    
    trait PartitionedHiveTableLoader[T] {
      def context: String
      def table: String
      def returnEntity: Row => T
      def load(sparkSession: SparkSession, token: String): Dataset[T] = {
        import sparkSession.implicits._
        sparkSession.sql(s"SELECT * from $context.$table where d = $token").
          map(returnEntity(_))
      }
      def load(sparkSession: SparkSession, lowBound: String, upperBound: String, includeLow: Boolean = true, includeUpper: Boolean = true): Dataset[T] = {
        import sparkSession.implicits._
        sparkSession.sql(s"SELECT * " +
          s"from $context.$table " +
          s"where d >${if(includeLow)"=" else ""} $lowBound " +
          s"and d<${if(includeUpper)"=" else ""} $upperBound").
          map(returnEntity(_))
      }
    }
    

    然后此特征用于对象,如下所示:

    import org.apache.spark.sql.Row
    
    object FreeUsersRightsLoader extends {} with PartitionedHiveTableLoader[FreeUsersRightsEntity] {
      def context: String = "analytics"
      def table: String =  "free_users_rights"
      def returnEntity: Row => FreeUsersRightsEntity = x => FreeUsersRightsDataset(x)
    }
    

    但当我用mvn包编译它时,我有以下错误:

    错误:找不到数据集中存储的类型的编码器。导入spark支持基本类型(Int、String等)和产品类型(case类)。隐含。\u在将来的版本中将添加对序列化其他类型的支持。

    但我引进了火花。在每个方法中都隐含。。。 有人知道问题出在哪里吗?

    1 回复  |  直到 6 年前
        1
  •  3
  •   Tzach Zohar    7 年前

    隐式 Encoder 对于类型 T 对于正在使用的方法,必须在编译时可用。

    导入时 import sparkSession.implicits._ ,您实际上为许多 已知的 常见类型(例如字符串、Long、数组、case类等),但- T 未知的 未绑定 ,所以可能是 任何东西 ,并且没有任何类具有内置编码器-因此此导入没有用处。

    要解决这个问题,您应该添加 隐式编码器参数 方法签名:

    def load(sparkSession: SparkSession, token: String)(implicit enc: Encoder[T]): Dataset[T] = {
      sparkSession.sql(s"SELECT * from $context.$table where d = $token").
        map(returnEntity(_))
    }
    
    def load(sparkSession: SparkSession,
             lowBound: String,
             upperBound: String,
             includeLow: Boolean = true,
             includeUpper: Boolean = true)(implicit enc: Encoder[T]): Dataset[T] = {
      sparkSession.sql(s"SELECT * " +
        s"from $context.$table " +
        s"where d >${if(includeLow)"=" else ""} $lowBound " +
        s"and d<${if(includeUpper)"=" else ""} $upperBound").
        map(returnEntity(_))
    }
    

    然后,无论这些方法位于何处,都需要内置的隐式 打电话 -已知T型 FreeUsersRightsEntity (我假设它是其中一个内置类,例如包含原语和集合的case类):

    import spark.implicits._
    
    FreeUsersRightsLoader.load(spark, "token")