我正在尝试接收表中的CSV文件,但在此之前,我需要根据头文件和尾文件进行一些验证。
样本数据
Header,TestApp,2020-01-01,
name, dept, age, batchDate
john, dept1, 33, 2020-01-01
john, dept1, 33, 2020-01-01
john, dept1, 33, 2020-01-01
john, dept1, 33, 2020-01-01
Trailer,count,4
这就是我正在做的正确的事情。
val df = spark.read.format("csv").load("/tmp/test.csv")
val indexed = df.withColumn("index", monotonicallyIncreasingId())
val last = indexed.agg(max($"index")).collect()(0)(0).asInstanceOf[Long]
//Remove header and Trailer record
val filtered = indexed.filter($"index" < last).filter($"index" >= 1)
//Write file without extra header and trailer, it still have column name
filtered.write.format("com.databricks.spark.csv").save("/tmp/test1")
//Read back with infer schema
val df1 = spark.read.format("csv").option("header", "true").option("inferSchema", "true").load("/tmp/test1")
//Count Validation
val count = indexed.filter($"index" === last).select("_c1").collect()(0)(0).asInstanceOf[String].toInt
assert(df.count == count)
inferschme
和
column header
从列标题(文件中的第二行)开始?