代码之家  ›  专栏  ›  技术社区  ›  Powers

从CSV文件创建Spark数据集

  •  11
  • Powers  · 技术社区  · 8 年前

    我想从一个简单的CSV文件创建一个Spark数据集。以下是CSV文件的内容:

    name,state,number_of_people,coolness_index
    trenton,nj,"10","4.5"
    bedford,ny,"20","3.3"
    patterson,nj,"30","2.2"
    camden,nj,"40","8.8"
    

    以下是制作数据集的代码:

    var location = "s3a://path_to_csv"
    
    case class City(name: String, state: String, number_of_people: Long)
    
    val cities = spark.read
      .option("header", "true")
      .option("charset", "UTF8")
      .option("delimiter",",")
      .csv(location)
      .as[City]
    

    number_of_people

    this blog post .

    编码器急切地检查您的数据是否与预期模式匹配, 处理TB的数据。例如,如果我们尝试使用 太小,以至于转换为对象会导致 截断(即numStudents大于一个字节,该字节包含一个 最大值255),Analyzer将发出AnalysisException。

    我正在使用 Long 类型,所以我没想到会看到此错误消息。

    2 回复  |  直到 7 年前
        1
  •  22
  •   2 revs<br/>user6022341    8 年前

    使用架构推断:

    val cities = spark.read
      .option("inferSchema", "true")
      ...
    

    或提供架构:

    val cities = spark.read
      .schema(StructType(Array(StructField("name", StringType), ...)
    

    或演员:

    val cities = spark.read
      .option("header", "true")
      .csv(location)
      .withColumn("number_of_people", col("number_of_people").cast(LongType))
      .as[City]
    
        2
  •  3
  •   Sparker0i    4 年前

    您的案例类为 case class City(name: String, state: String, number_of_people: Long) , 你只需要一行

    private val cityEncoder = Seq(City("", "", 0)).toDS
    

    然后你编码

    val cities = spark.read
    .option("header", "true")
    .option("charset", "UTF8")
    .option("delimiter",",")
    .csv(location)
    .as[City]
    

    就行了。

    这是官方消息来源[ http://spark.apache.org/docs/latest/sql-programming-guide.html#overview][1]