import org.apache.spark.sql.Row
import org.apache.spark.sql.types.{StructField,StructType,IntegerType, ArrayType, LongType}
val df = sc.parallelize(Seq((1.0, 2.0), (0.0, -1.0), (3.0, 4.0), (6.0, -2.3))).toDF("x", "y")
val newSchema = StructType(df.schema.fields ++ Array(StructField("rowid", LongType, false)))
val rddWithId = df.rdd.zipWithIndex
val dfZippedWithId = spark.createDataFrame(rddWithId.map{ case (row, index) => Row.fromSeq(row.toSeq ++ Array(index))}, newSchema)
这种结构:
rddWithZipId: org.apache.spark.rdd.RDD[((String, Int, Array[String]), Long)] = ZippedWithIndexRDD[149] at zipWithIndex at command-2467674133341972:32
当我执行与前面示例完全相同的操作时产生错误。唯一的区别是数组[String]。
产生的错误是:
notebook:45: error: value toSeq is not a member of (String, Int, Array[String])
val dfPosts = spark.createDataFrame(rddWithZipId.map{ case (row, index) => Row.fromSeq(row.toSeq ++ Array(index))}, newSchema)
环顾四周,我不明白这为什么行不通。我注意到,尽管我在RDD中看到4个类型/元素,但我可以观察到行被看作1个结构。
有什么想法吗?有其他可能的方法,但我不明白为什么第一个例子有效,第二个不?数组[String]似乎是罪魁祸首。事实上,这是必须的,但如何绕过这一点?
val dfPosts = rddWithZipId.toDF()
返回一个嵌套的模式,如下所示,所以可能就是这样,但问题仍然是如上所述。嵌套结构意味着我可以实现我想做的,这不是问题。
root
|-- _1: struct (nullable = true)
| |-- _1: string (nullable = true)
| |-- _2: integer (nullable = false)
| |-- _3: array (nullable = true)
| | |-- element: string (containsNull = true)
|-- _2: long (nullable = false)
我认为嵌套结构需要以某种方式定义。
UPD:事后看来,我肯定是学习太晚了,不知道是什么问题。