代码之家  ›  专栏  ›  技术社区  ›  dejanmarich

在Spark中将扁平数据帧转换为结构

  •  0
  • dejanmarich  · 技术社区  · 5 年前

    我有一个很深的窝 JSON 我必须处理的文件,为了做到这一点,我必须将它们展平,因为找不到散列一些深嵌套字段的方法。这就是我的 dataframe 看起来像(压平后):

    scala> flattendedJSON.printSchema
    root
     |-- header_appID: string (nullable = true)
     |-- header_appVersion: string (nullable = true)
     |-- header_userID: string (nullable = true)
     |-- body_cardId: string (nullable = true)
     |-- body_cardStatus: string (nullable = true)
     |-- body_cardType: string (nullable = true)
     |-- header_userAgent_browser: string (nullable = true)
     |-- header_userAgent_browserVersion: string (nullable = true)
     |-- header_userAgent_deviceName: string (nullable = true)
     |-- body_beneficiary_beneficiaryAccounts_beneficiaryAccountOwner: string (nullable = true)
     |-- body_beneficiary_beneficiaryPhoneNumbers_beneficiaryPhoneNumber: string (nullable = true)
    

    scala> nestedJson.printSchema
    root
     |-- header: struct (nullable = true)
     |    |-- appID: string (nullable = true)
     |    |-- appVersion: string (nullable = true)
     |    |-- userAgent: struct (nullable = true)
     |    |    |-- browser: string (nullable = true)
     |    |    |-- browserVersion: string (nullable = true)
     |    |    |-- deviceName: string (nullable = true)
     |-- body: struct (nullable = true)
     |    |-- beneficiary: struct (nullable = true)
     |    |    |-- beneficiaryAccounts: array (nullable = true)
     |    |    |    |-- element: struct (containsNull = true)
     |    |    |    |    |-- beneficiaryAccountOwner: string (nullable = true)
     |    |    |-- beneficiaryPhoneNumbers: array (nullable = true)
     |    |    |    |-- element: struct (containsNull = true)
     |    |    |    |    |-- beneficiaryPhoneNumber: string (nullable = true)
     |    |-- cardId: string (nullable = true)
     |    |-- cardStatus: string (nullable = true)
     |    |-- cardType: string (nullable = true)
    

    我已经设法用一个嵌套字段来实现它,但是如果它更多,它就不能工作,我也找不到正确的方法来实现它。以下是我尝试的:

     val structColumns = flattendedJSON.columns.filter(_.contains("_"))
      val structColumnsMap = structColumns.map(_.split("\\_")).
      groupBy(_(0)).mapValues(_.map(_(1)))
    
      val dfExpanded = structColumnsMap.foldLeft(flattendedJSON){ (accDF, kv) =>
      val cols = kv._2.map(v => col("`" + kv._1 + "_" + v + "`").as(v))
      accDF.withColumn(kv._1, struct(cols: _*))
    }
    val dfResult = structColumns.foldLeft(flattendedJSON)(_ drop _)
    

    如果我有一个嵌套对象(例如。 header_appID ),但如果 header_userAgent_browser ,我得到一个例外:

    org.apache.spark.sql.AnalysisException:无法解析 ' header_userAgent

    使用 Spark 2.3 Scala 2.11.8

    0 回复  |  直到 5 年前
        1
  •  0
  •   Cesar A. Mostacero    5 年前

    case classes 和一个 Dataset 而不是压平 DF 然后再次尝试转换成旧的 json 格式。即使它有嵌套对象,也可以定义一组 案例类别 投下它。它允许您使用对象表示法,使事情比 测向 . 有一些工具可以提供 json文件 它会为你生成类(我用这个: https://json2caseclass.cleverapps.io ). 如果你想把它从 测向 ,另一种方法是创建 使用 map 在你的 测向 . 像这样:

    case class NestedNode(fieldC: String, fieldD: String)   // for JSON
    case class MainNode(fieldA: String, fieldB: NestedNode) // for JSON
    case class FlattenData(fa: String, fc: String, fd: String) 
    
    Seq(
      FlattenData("A1", "B1", "C1"),
      FlattenData("A2", "B2", "C2"),
      FlattenData("A3", "B3", "C3")
    ).toDF
     .as[FlattenData] // Cast it to access with object notation
     .map(flattenItem=>{
        MainNode(flattenItem.fa, NestedNode(flattenItem.fc, flattenItem.fd) ) // Creating output format
      })
    

    最后,用类定义的模式将由 yourDS.write.mode(your_save_mode).json(your_target_path)