代码之家  ›  专栏  ›  技术社区  ›  Vlad

从未解析的文本字符串在spark中创建数据框

  •  0
  • Vlad  · 技术社区  · 6 年前

    我正在使用Scala和Spark分析一些数据。 对不起,我在这方面是个新手。

    我有以下格式的数据(如下) 我想创建RDD来过滤数据、分组和转换数据。

    目前我有rdd和未解析字符串列表 我从rawData创建了它:字符串列表

    val rawData  ( this is ListBuffer[String] )
    val rdd = sc.parallelize(rawData)
    

    如何创建数据集来操作数据? 我希望Rdd中的对象具有命名字段行ob。名称,对象。年份等 什么是正确的方法?

    我应该为此创建数据框吗?

    原始数据字符串如下所示:这是字符串列表,值以空格分隔

    列含义:“名称”、“年份”、“月份”、“tmax”、“tmin”、“afdays”、“rainmm”、“Sunhurs”

    aberporth    1941  10    ---     ---    ---    106.2     ---
    aberporth    1941  11    ---     ---    ---     92.3     ---
    aberporth    1941  12    ---     ---    ---     86.5     ---
    aberporth    1942   1    5.8     2.1    ---    114.0    58.0
    aberporth    1942   2    4.2    -0.6    ---     13.8    80.3
    aberporth    1942   3    9.7     3.7    ---     58.0   117.9
    aberporth    1942   4   13.1     5.3    ---     42.5   200.1
    aberporth    1942   5   14.0     6.9    ---    101.1   215.1
    aberporth    1942   6   16.2     9.9    ---      2.3   269.3
    aberporth    1942   7   17.4    11.3    12     70.2*   185.0
    aberporth    1942   8   18.7    12.3    5-     78.5   141.9
    aberporth    1942   9   16.4    10.7    123    146.8   129.1#
    aberporth    1942  10   13.1     8.2    125    131.1    82.1l
    

    ----表示没有数据,我想我可以将0放入此列。

    70.2*、129.1#、82。这里的l-*、#和l应该被过滤

    请给我指一下正确的方向。

    我在这里找到了一个可能的解决方案: https://medium.com/@mrpowers/manually-creating-spark-dataframes-b14dae906393

    这个例子看起来不错:

    val someData = Seq(
      Row(8, "bat"),
      Row(64, "mouse"),
      Row(-27, "horse")
    )
    
    val someSchema = List(
      StructField("number", IntegerType, true),
      StructField("word", StringType, true)
    )
    
    val someDF = spark.createDataFrame(
      spark.sparkContext.parallelize(someData),
      StructType(someSchema)
    )
    

    如何将字符串列表转换为行的Seq?

    1 回复  |  直到 6 年前
        1
  •  1
  •   koiralo    6 年前

    您可以将数据读取为文本文件并替换 --- 具有 0 并删除特殊字符或过滤掉。(我已在下面的示例中替换)

    创建一个case类来表示数据

    case class Data(
                     name: String, year: String, month: Int, tmax: Double,
                     tmin: Double, afdays: Int, rainmm: Double, sunhours: Double
                 )
    

    读取文件

    val data = spark.read.textFile("file path")  //read as a text file
      .map(_.replace("---", "0").replaceAll("-|#|\\*", ""))  //replace special charactes 
      .map(_.split("\\s+"))
      .map(x =>  // create Data object for each record
        Data(x(0), x(1), x(2).toInt, x(3).toDouble, x(4).toDouble, x(5).toInt, x(6).toDouble, x(7).replace("l", "").toDouble)
      )
    

    现在你可以 Dataset[Data] 这是从文本解析的数据集。

    输出:

    +---------+----+-----+----+----+------+------+--------+
    |name     |year|month|tmax|tmin|afdays|rainmm|sunhours|
    +---------+----+-----+----+----+------+------+--------+
    |aberporth|1941|10   |0.0 |0.0 |0     |106.2 |0.0     |
    |aberporth|1941|11   |0.0 |0.0 |0     |92.3  |0.0     |
    |aberporth|1941|12   |0.0 |0.0 |0     |86.5  |0.0     |
    |aberporth|1942|1    |5.8 |2.1 |0     |114.0 |58.0    |
    |aberporth|1942|2    |4.2 |0.6 |0     |13.8  |80.3    |
    |aberporth|1942|3    |9.7 |3.7 |0     |58.0  |117.9   |
    |aberporth|1942|4    |13.1|5.3 |0     |42.5  |200.1   |
    |aberporth|1942|5    |14.0|6.9 |0     |101.1 |215.1   |
    |aberporth|1942|6    |16.2|9.9 |0     |2.3   |269.3   |
    |aberporth|1942|7    |17.4|11.3|12    |70.2  |185.0   |
    |aberporth|1942|8    |18.7|12.3|5     |78.5  |141.9   |
    |aberporth|1942|9    |16.4|10.7|123   |146.8 |129.1   |
    |aberporth|1942|10   |13.1|8.2 |125   |131.1 |82.1    |
    +---------+----+-----+----+----+------+------+--------+
    

    我希望这有帮助!