我有一个
nested JSON dataset
这是用
Spark
斯卡拉。此外,当我试图将其展平时,由于
duplicate
柱。所以,我必须先重命名它,然后将其展平,但我无法这样做。下面是我用来展平数据集的代码:
def flattenDataframe(df: DataFrame): DataFrame = {
//getting all the fields from schema
val fields = df.schema.fields
val fieldNames = fields.map(x => x.name)
//length shows the number of fields inside dataframe
val length = fields.length
for (i <- 0 to fields.length - 1) {
val field = fields(i)
val fieldtype = field.dataType
val fieldName = field.name
fieldtype match {
case arrayType: ArrayType =>
val fieldName1 = fieldName
val fieldNamesExcludingArray = fieldNames.filter(_ != fieldName1)
val fieldNamesAndExplode = fieldNamesExcludingArray ++ Array(s"explode_outer($fieldName1) as $fieldName1")
//val fieldNamesToSelect = (fieldNamesExcludingArray ++ Array(s"$fieldName1.*"))
val explodedDf = df.selectExpr(fieldNamesAndExplode: _*)
return flattenDataframe(explodedDf)
case structType: StructType =>
val childFieldnames = structType.fieldNames.map(childname => fieldName + "." + childname)
val newfieldNames = fieldNames.filter(_ != fieldName) ++ childFieldnames
val renamedcols = newfieldNames.map(x => (col(x.toString()).as(x.toString().replace(".", "_").replace("$", "_").replace("__", "_").replace(" ", "").replace("-", ""))))
val explodedf = df.select(renamedcols: _*)
return flattenDataframe(explodedf)
case _ =>
}
}
df
}
将数据集展平后,如下所示:
我已经使用了下面的循环来重命名重复的列,但是它可以很好地用于页面上的JSON元素
same level
:
val orgList = df.columns.toList // take the original list
val dupsList = orgList.map(.toLowerCase()).diff(orgList.map(.toLowerCase()).distinct).distinct //take the duplicate columns list in lower case
var i = 1
var newDf: DataFrame = df
for (dups <- dupsList) {
i=1
for (key <- orgList) {
if (key.toLowerCase() == dups) {
newDf = newDf.withColumnRenamed(key, key + "_" + i)
i += 1
//println(i)
}
}
}
在将其展平之前如何重命名?