代码之家  ›  专栏  ›  技术社区  ›  Ged

通过createDataFrame将带有Array[String]的RDD转换为DF时出错

  •  -1
  • Ged  · 技术社区  · 6 年前

    import org.apache.spark.sql.Row
    import org.apache.spark.sql.types.{StructField,StructType,IntegerType, ArrayType, LongType}
    
    val df = sc.parallelize(Seq((1.0, 2.0), (0.0, -1.0), (3.0, 4.0), (6.0, -2.3))).toDF("x", "y")
    val newSchema = StructType(df.schema.fields ++ Array(StructField("rowid", LongType, false)))
    
    val rddWithId = df.rdd.zipWithIndex
    val dfZippedWithId =  spark.createDataFrame(rddWithId.map{ case (row, index) => Row.fromSeq(row.toSeq ++ Array(index))}, newSchema)
    

    这种结构:

    rddWithZipId: org.apache.spark.rdd.RDD[((String, Int, Array[String]), Long)] = ZippedWithIndexRDD[149] at zipWithIndex at command-2467674133341972:32
    

    当我执行与前面示例完全相同的操作时产生错误。唯一的区别是数组[String]。

    产生的错误是:

    notebook:45: error: value toSeq is not a member of (String, Int, Array[String])
    val dfPosts =  spark.createDataFrame(rddWithZipId.map{ case (row, index) => Row.fromSeq(row.toSeq ++ Array(index))}, newSchema)
    

    环顾四周,我不明白这为什么行不通。我注意到,尽管我在RDD中看到4个类型/元素,但我可以观察到行被看作1个结构。

    有什么想法吗?有其他可能的方法,但我不明白为什么第一个例子有效,第二个不?数组[String]似乎是罪魁祸首。事实上,这是必须的,但如何绕过这一点?

    val dfPosts =  rddWithZipId.toDF()
    

    返回一个嵌套的模式,如下所示,所以可能就是这样,但问题仍然是如上所述。嵌套结构意味着我可以实现我想做的,这不是问题。

    root
      |-- _1: struct (nullable = true)
      |    |-- _1: string (nullable = true)
      |    |-- _2: integer (nullable = false)
      |    |-- _3: array (nullable = true)
      |    |    |-- element: string (containsNull = true)
      |-- _2: long (nullable = false)
    

    我认为嵌套结构需要以某种方式定义。

    UPD:事后看来,我肯定是学习太晚了,不知道是什么问题。

    1 回复  |  直到 5 年前
        1
  •  0
  •   Aaron Makubuya    6 年前

    有其他可能的方法,但我不明白为什么第一个例子有效,第二个不?数组[String]似乎是罪魁祸首。

    rddWithId

    RDD[(Row, Long)]
    

    第二种结构是

    RDD[((String, Int, Array[String]), Long)]
    

    所以在第一种情况下 _1 org.apache.spark.sql.Row 它提供 toSeq 方法,而在第二种情况下 _1 Tuple3[_, _, _] 但没有提供这样的方法。

    如果你想成功的话

    Row.fromSeq(row.toSeq ++ Array(index))
    

    代替

    Row.fromSeq(("a", 1, Array("foo")).productIterator.toSeq ++ Array(index))
    

    或者更好(为什么要初始化其他 Array

    Row.fromSeq(("a", 1, Array("foo")).productIterator.toSeq :+ index)