代码之家  ›  专栏  ›  技术社区  ›  whatsinthename

如何使用spark scala动态重命名嵌套JSON中的重复列

  •  0
  • whatsinthename  · 技术社区  · 3 年前

    我有一个 nested JSON dataset 这是用 Spark 斯卡拉。此外,当我试图将其展平时,由于 duplicate 柱。所以,我必须先重命名它,然后将其展平,但我无法这样做。下面是我用来展平数据集的代码:

    def flattenDataframe(df: DataFrame): DataFrame = {
        //getting all the fields from schema
        val fields = df.schema.fields
        val fieldNames = fields.map(x => x.name)
        //length shows the number of fields inside dataframe
        val length = fields.length
        for (i <- 0 to fields.length - 1) {
          val field = fields(i)
          val fieldtype = field.dataType
          val fieldName = field.name
          fieldtype match {
            case arrayType: ArrayType =>
              val fieldName1 = fieldName
              val fieldNamesExcludingArray = fieldNames.filter(_ != fieldName1)
              val fieldNamesAndExplode = fieldNamesExcludingArray ++ Array(s"explode_outer($fieldName1) as $fieldName1")
              //val fieldNamesToSelect = (fieldNamesExcludingArray ++ Array(s"$fieldName1.*"))
              val explodedDf = df.selectExpr(fieldNamesAndExplode: _*)
              return flattenDataframe(explodedDf)
    
            case structType: StructType =>
              val childFieldnames = structType.fieldNames.map(childname => fieldName + "." + childname)
              val newfieldNames = fieldNames.filter(_ != fieldName) ++ childFieldnames
              val renamedcols = newfieldNames.map(x => (col(x.toString()).as(x.toString().replace(".", "_").replace("$", "_").replace("__", "_").replace(" ", "").replace("-", ""))))
              val explodedf = df.select(renamedcols: _*)
              return flattenDataframe(explodedf)
            case _ =>
          }
        }
        df
      }  
    

    将数据集展平后,如下所示:

    enter image description here

    我已经使用了下面的循环来重命名重复的列,但是它可以很好地用于页面上的JSON元素 same level :

    val orgList = df.columns.toList // take the original list
     val dupsList = orgList.map(.toLowerCase()).diff(orgList.map(.toLowerCase()).distinct).distinct //take the duplicate columns list in lower case
    var i = 1
        var newDf: DataFrame = df
        for (dups <- dupsList) {
          i=1
          for (key <- orgList) {
            if (key.toLowerCase() == dups) {
              newDf = newDf.withColumnRenamed(key, key + "_" + i)
              i += 1
              //println(i)
            }
          }
        }
    

    在将其展平之前如何重命名?

    0 回复  |  直到 3 年前