代码之家  ›  专栏  ›  技术社区  ›  tree em

createDataFrame获取消息java.lang.String不能转换为java.sql.Date

  •  -1
  • tree em  · 技术社区  · 6 年前

    我正在尝试将标题合并到单个文件输出中作为csv( ref (作者@Kang)

    import org.apache.hadoop.conf.Configuration
    import org.apache.hadoop.fs.{FileSystem, FileUtil, Path}
    import org.apache.spark.sql.{Row, SparkSession}
    import org.apache.spark.sql.functions._
    import org.apache.spark.sql.Row
    import org.apache.spark.sql.types.{StructField, StringType, StructType}
    
    object ListOfSavingFiltered {
      def merge(srcPath: String, dstPath: String): Unit = {
        val hadoopConfig = new Configuration()
        val hdfs = FileSystem.get(hadoopConfig)
        FileUtil.copyMerge(hdfs, new Path(srcPath), hdfs, new Path(dstPath), false, hadoopConfig, null)
        // the "true" setting deletes the source files once they are merged into the new output
      }
    
      def main(args: Array[String]): Unit = {
    
        val url = "jdbc:sqlserver://localhost;databaseName=InsightWarehouse;integratedSecurity=true";
        val driver = "com.microsoft.sqlserver.jdbc.SQLServerDriver"
    
        val v_Account = "dbo.v_Account"
        val v_Customer = "dbo.v_Customer"
    
        val spark = SparkSession.
          builder.master("local[*]")
          //.config("spark.debug.maxToStringFields", "100")
          .appName("Insight Application Big Data")
          .getOrCreate()
    
    
        val dfAccount = spark
          .read
          .format("jdbc")
          .option("url", url)
          .option("driver", driver)
          .option("dbtable", v_Account)
          .load()
    
        val dfCustomer = spark
          .read
          .format("jdbc")
          .option("url", url)
          .option("driver", driver)
          .option("dbtable", v_Customer)
          .load()
    
        val Classification = Seq("Contractual Account", "Non-Term Deposit", "Term Deposit")
    
        //dfAccount.printSchema()
        val joined = dfAccount.as("a")
          .join(dfCustomer.as("c"),
            Seq("BusinessDate", "CustomerID"), "LEFT")
          .filter(
            dfAccount.col("BusinessDate") === "2018-11-28"
              && dfAccount.col("Category") === "Deposit"
              // && dfAccount.col("IsActive").equalTo("Yes")
              && dfAccount.col("Classification").isin(Classification: _*)
             )
    
        //joined.show()
        val columnNames = Seq[String](
          "a.AcctBranchName",
          "c.CustomerNum",
          "c.SourceCustomerId",
          "a.SourceAccountId",
          "a.AccountNum",
          "c.FullName",
          "c.LastName",
          "c.BirthDate",
          "a.Balance",
          "a.InterestAccrued",
          "a.InterestRate",
          "a.SpreadRate",
          "a.Classification",
          "a.ProductType",
          "a.ProductDesc",
          "a.StartDate",
          "a.MaturityDate",
          "a.ClosedDate",
          "a.FixOrVar",
          "a.Term",
          "a.TermUnit",
          "a.MonthlyNetIncome",
          "a.Status_",
          "a.HoldsTotal",
          "a.AvailableFunds",
          "a.InterestRateIndex",
          "a.InterestRateVariance",
          "a.FeePlan",
          "c.CustEmplFullName",
          "a.IsActive",
          "c.Residence",
          "c.Village",
          "c.Province",
          "c.Commune",
          "c.District",
          "a.Currency",
          "c.TaxType",
          "c.TaxRate",
          "RollOverStatus"
        )
    
        val outputfile = "src/main/resources/out/"
        var filename = "lifOfSaving.csv.gz"
        var outputFileName = outputfile + "/temp_" + filename
        var mergedFileName = outputfile + "/merged_" + filename
        var mergeFindGlob = outputFileName
    
        val responseWithSelectedColumns = joined.select(columnNames.map(c => col(c)): _*)
          .withColumn("RollOverStatus", when(col("RollOverStatus").equalTo("Y"), "Yes").otherwise("No"))
    
    
        //create a new data frame containing only header names
        import scala.collection.JavaConverters._
        val headerDF = spark.createDataFrame(List(Row.fromSeq(responseWithSelectedColumns.columns.toSeq)).asJava, responseWithSelectedColumns.schema)
    
    
        //merge header names with data
        headerDF.union(responseWithSelectedColumns)
          // .coalesce(1) //So just a single part- file will be created
          .repartition(4)
          .write.mode("overwrite")
          .option("codec", "org.apache.hadoop.io.compress.GzipCodec")
          .format("com.databricks.spark.csv")
          .option("charset", "UTF8")
          .option("mapreduce.fileoutputcommitter.marksuccessfuljobs", "false") //Avoid creating of crc files
          .option("header", "false") //Write the header
    
          .save(outputFileName)
        merge(mergeFindGlob, mergedFileName)
        responseWithSelectedColumns.unpersist()
    
        spark.stop()
      }
    }
    

    代码似乎正确,但仍会收到如下错误消息:

    Exception in thread "main" java.lang.ClassCastException: java.lang.String cannot be cast to java.sql.Date
        at org.apache.spark.sql.catalyst.CatalystTypeConverters$DateConverter$.toCatalystImpl(CatalystTypeConverters.scala:300)
    

    有人能帮忙吗?

    1 回复  |  直到 6 年前
        1
  •  1
  •   Luis Miguel Mejía Suárez    6 年前

    你不需要做出决定 标题 DataFrame 数据 .

    import org.apache.spark.sql.{SparkSession, functions => sqlfunctions}
    
    val spark =
      SparkSession
      .builder
      .master("local[*]")
      .getOrCreate()
    import spark.implicits._
    
    val dataDF =
      List(
        (1, "Luis"),
        (2, "kn3l")
      ).toDF("id", "name").withColumn("date", sqlfunctions.current_date())
    
    val headersDF = 
      List(
        ("id", "name", "date")
      ).toDF("id", "name", "date")
    
    val union = headersDF.unionByName(dataDF)
    // union: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [id: string, name: string, date: string]
    
    union.printSchema()
    // root
    // |-- id: string (nullable = true)
    // |-- name: string (nullable = true)
    // |-- date: string (nullable = true)
    
    union.show()
    // +---+----+----------+
    // | id|name|      date|
    // +---+----+----------+
    // | id|name|      date|
    // |  1|Luis|2018-12-05|
    // |  2|kn3l|2018-12-05|
    // +---+----+----------+