代码之家  ›  专栏  ›  技术社区  ›  Kask

spark将字符串转换为时间戳类型

  •  0
  • Kask  · 技术社区  · 6 年前

    我有一个数据框架,我想在spark中插入Postgresql。在spark中,DateTimestamp列采用字符串格式。在postgreSQL中,它是没有时区的时间戳。

    在日期-时间列上插入数据库时引发错误。我确实尝试更改数据类型,但插入仍然出错。我不明白为什么演员阵容不起作用。如果我将相同的insert字符串粘贴到PgAdmin并运行,insert语句运行良好。

    import java.text.SimpleDateFormat;
    import java.util.Calendar
    object EtlHelper {
     // Return the current time stamp
    
      def getCurrentTime() : String = {    
        val now = Calendar.getInstance().getTime()   
        val hourFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")   
        return hourFormat.format(now)   
      }
     }  
    

    在另一个文件中

    object CreateDimensions {
    
    def createDimCompany(spark:SparkSession, location:String, propsLocation :String):Unit = {      
    import spark.implicits._    
    
    val dimCompanyStartTime = EtlHelper.getCurrentTime()
    val dimcompanyEndTime = EtlHelper.getCurrentTime()
    val prevDimCompanyId = 2
    val numRdd = 27
    val AuditDF = spark.createDataset(Array(("dim_company", prevDimCompanyId,numRdd,dimCompanyStartTime,dimcompanyEndTime))).toDF("audit_tbl_name","audit_tbl_id","audit_no_rows","audit_tbl_start_date","audit_tbl_end_date")//.show()
    
    AuditDF.withColumn("audit_tbl_start_date",AuditDF.col("audit_tbl_start_date").cast(DataTypes.TimestampType))
    AuditDF.withColumn("audit_tbl_end_date",AuditDF.col("audit_tbl_end_date").cast(DataTypes.TimestampType))
    
    AuditDF.printSchema()
    }  
    }
    
    root
     |-- audit_tbl_name: string (nullable = true)
     |-- audit_tbl_id: long (nullable = false)
     |-- audit_no_rows: long (nullable = false)
     |-- audit_tbl_start_date: string (nullable = true)
     |-- audit_tbl_end_date: string (nullable = true)
    

    这就是我得到的错误

    INSERT INTO etl.audit_master ("audit_tbl_name","audit_tbl_id","audit_no_rows","audit_tbl_start_date","audit_tbl_end_date") VALUES ('dim_company',27,2,'2018-05-02 12:15:54','2018-05-02 12:15:59') was aborted: ERROR: column "audit_tbl_start_date" is of type timestamp without time zone but expression is of type character varying
      Hint: You will need to rewrite or cast the expression.
    

    非常感谢您的帮助。

    非常感谢。

    2 回复  |  直到 6 年前
        1
  •  2
  •   Ramesh Maharjan    6 年前

    AuditDF.printSchema() 正在拿原件 AuditDF dataframe,因为您没有保存 .withColumn 通过指定。 数据帧是不可变的对象,可以转换为其他数据帧,但不能更改自身。因此,您总是需要一个赋值来保存已应用的转换。

    因此,正确的方法是分配以保存更改

    val transformedDF = AuditDF.withColumn("audit_tbl_start_date",AuditDF.col("audit_tbl_start_date").cast(DataTypes.TimestampType))
                              .withColumn("audit_tbl_end_date",AuditDF.col("audit_tbl_end_date").cast("timestamp"))
    
    transformedDF.printSchema()
    

    你会看到变化的

    root
     |-- audit_tbl_name: string (nullable = true)
     |-- audit_tbl_id: integer (nullable = false)
     |-- audit_no_rows: integer (nullable = false)
     |-- audit_tbl_start_date: timestamp (nullable = true)
     |-- audit_tbl_end_date: timestamp (nullable = true)
    

    .cast(DataTypes.TimestampType) .cast("timestamp") 都是一样的

        2
  •  1
  •   Arnon Rotem-Gal-Oz    6 年前

    问题的根源是@Ramesh提到的,即您没有将AuditDF中的更改分配给新值(val)。请注意,数据帧和您分配给它的值都是不可变的(即AuditDF是定义为val的,因此它也不能更改)

    另一件事是,您不需要重新发明轮子,也不需要使用EtlHelper spark的内置功能,它可以为您提供当前时间的时间戳:

    import org.apache.spark.sql.functions._
    
    val AuditDF = spark.createDataset(Array(("dim_company", prevDimCompanyId,numRdd)))
    .toDF("audit_tbl_name","audit_tbl_id","audit_no_rows")
    .withColumn("audit_tbl_start_date"current_timestamp())
    .withColumn("audit_tbl_end_date",current_timestamp())