代码之家  ›  专栏  ›  技术社区  ›  Gaurang Shah

Spark-从CSV文件中删除头文件和尾文件

  •  0
  • Gaurang Shah  · 技术社区  · 3 年前

    我正在尝试接收表中的CSV文件,但在此之前,我需要根据头文件和尾文件进行一些验证。

    样本数据

    Header,TestApp,2020-01-01,
    name, dept, age, batchDate
    john, dept1, 33, 2020-01-01
    john, dept1, 33, 2020-01-01
    john, dept1, 33, 2020-01-01
    john, dept1, 33, 2020-01-01
    Trailer,count,4
    

    这就是我正在做的正确的事情。

    val df = spark.read.format("csv").load("/tmp/test.csv")
    val indexed = df.withColumn("index", monotonicallyIncreasingId())
    val last = indexed.agg(max($"index")).collect()(0)(0).asInstanceOf[Long]
    //Remove header and Trailer record
    val filtered  = indexed.filter($"index" < last).filter($"index" >= 1)
    
    //Write file without extra header and trailer, it still have column name
    filtered.write.format("com.databricks.spark.csv").save("/tmp/test1")
    
    //Read back with infer schema
    val df1 = spark.read.format("csv").option("header", "true").option("inferSchema", "true").load("/tmp/test1")
    
    //Count Validation
    val count = indexed.filter($"index" === last).select("_c1").collect()(0)(0).asInstanceOf[String].toInt
    assert(df.count == count)
    
    

    inferschme column header 从列标题(文件中的第二行)开始?

    0 回复  |  直到 3 年前
        1
  •  3
  •   mvasyliv    3 年前
    import spark.implicits._
    import org.apache.spark.sql.{Column, Encoders, SparkSession}
    import org.apache.spark.sql.types.StructType
    import org.apache.spark.sql.functions.{col, trim}
    
    val df1 = spark.read.option("header", false).csv(f)
    
    val colNames = List("name", "dept", "age", "batchDate")
    val df2 = df1
      .filter('_c0 =!= "Header")
      .filter('_c0 =!= "Trailer")
      .filter('_c0 =!= "name")
    df2.show(false)
    
    //  +----+------+---+-----------+
    //  |_c0 |_c1   |_c2|_c3        |
    //  +----+------+---+-----------+
    //  |john| dept1| 33| 2020-01-01|
    //  |john| dept1| 33| 2020-01-01|
    //  |john| dept1| 33| 2020-01-01|
    //  |john| dept1| 33| 2020-01-01|
    //  +----+------+---+-----------+
    
    val df3 = df2.toDF(colNames: _*)
    
    df3.show(false)
    //    +----+------+---+-----------+
    //    |name|dept  |age|batchDate  |
    //    +----+------+---+-----------+
    //    |john| dept1| 33| 2020-01-01|
    //    |john| dept1| 33| 2020-01-01|
    //    |john| dept1| 33| 2020-01-01|
    //    |john| dept1| 33| 2020-01-01|
    //    +----+------+---+-----------+
    
    df3.printSchema()
    //  root
    //  |-- name: string (nullable = true)
    //  |-- dept: string (nullable = true)
    //  |-- age: string (nullable = true)
    //  |-- batchDate: string (nullable = true)
    
    case class SchemaClass(name: String, dept: String, age: Int, batchDate: String)
    val schema: StructType    = Encoders.product[SchemaClass].schema
    val sch = df3.schema
    val schemaDiff = schema.diff(sch)
    
    val rr = schemaDiff.foldLeft(df3)((acc, clmn) => {
      acc.withColumn(clmn.name , trim(col(clmn.name)).cast(clmn.dataType))
    })
    
    rr.show(false)
    //  +----+------+---+-----------+
    //  |name|dept  |age|batchDate  |
    //  +----+------+---+-----------+
    //  |john| dept1|33 | 2020-01-01|
    //  |john| dept1|33 | 2020-01-01|
    //  |john| dept1|33 | 2020-01-01|
    //  |john| dept1|33 | 2020-01-01|
    //  +----+------+---+-----------+
    
    
    rr.printSchema
    //  root
    //  |-- name: string (nullable = true)
    //  |-- dept: string (nullable = true)
    //  |-- age: integer (nullable = true)
    //  |-- batchDate: string (nullable = true)