代码之家  ›  专栏  ›  技术社区  ›  Surender Raja

将scala-json解析为dataframe

  •  0
  • Surender Raja  · 技术社区  · 6 年前

    示例json

     "alternateId": [
        {
            "type": "POPID",
            "value": "1-7842-0759-001"
        },
        {
            "type": "CAMID",
            "value": "CAMID 0000-0002-7EC1-02FF-O-0000-0000-2"
        },
        {
            "type": "ProgrammeUuid",
            "value": "1ddb01e2-6146-4e10-bba9-dde40d0ad886"
        }
    ]
    

    我想用两列更新现有的数据框,这两列是popid和camid。这两个值需要从json结构中解析 我不知道如何解析这个结构,你能帮我在fetchfield方法上做些什么改变吗?如上所述,json popid放在第一位,camid放在第二位,但在真正的json中,它可以放在alternateID中这三个位置之一。

     val fetchCAMID_udf = udf(fetchCAMID _)
     val fetchPOPID_udf = udf(fetchPOPID _)
    
     var updatedDf = //Data frame initialize
    
     updatedDf = updatedDf.withColumn("CAMID", fetchCAMID_udf(col("alternate_id")))
     updatedDf = updatedDf.withColumn("POPID", fetchPOPID_udf(col("alternate_id")))
     updatedDf .show(10,false)
    
    
     def fetchCAMID(jsonStr: String): String = {
    var CAMID: String = fetchField(jsonStr, "CAMID")
     CAMID
    }
    
     def fetchPOPID(jsonStr: String): String = {
    fetchField(jsonStr, "POPID")
    }
    
    
     def fetchField(jsonStr: String, fieldName: String): String = {
     try {
       implicit val formats = DefaultFormats
       val extractedField = jsonStr match {
        case "(unknown)" => jsonStr
        case _ => {
          val json = JsonMethods.parse(jsonStr)
          val resultExtracted = (json \\ fieldName)
          val result = resultExtracted match {
            case _: JString => resultExtracted.extract[String]
            case _: JInt => resultExtracted.extract[Int].toString
            case _: JObject => "(unknown)"
          }
          result
        }
       }
      extractedField
     }
    catch{
      case e: Exception =>{
        log.error(s"Fetch field failed. Field name: $fieldName . Json: $jsonStr")
        "(unknown)"
       }
      }
    }
    
    2 回复  |  直到 6 年前
        1
  •  1
  •   Ramesh Maharjan    6 年前

    改变你的 fetchField 功能如下

    def fetchField(jsonStr: String, fieldName: String): String = {
      try {
        val typeAndValue = (JsonMethods.parse("{"+jsonStr+"}") \ "alternateId" \ "type" \\ classOf[JString]).zip(JsonMethods.parse("{"+jsonStr+"}") \ "alternateId" \ "value" \\ classOf[JString])
        typeAndValue.filter(_._1 == fieldName).map(_._2).toList(0)
      }catch{
        case e: Exception =>{
          "(unknown)"
        }
      }
    }
    

    你得到了 CAMID POPID 密集的

        2
  •  0
  •   Ramesh Maharjan    6 年前

    您可以使用spark读取json并使用常规spark操作获取它

    val df=spark.read.option("multiLine",true).json("test.json")
    
     df.select($"alternateId".getItem(0).as("pop"),$"alternateId".getItem(1).as("cam")).select($"pop.value".as("POPID"),$"cam.value".as("CAMID")).show()
    
    +---------------+--------------------+
    |          POPID|               CAMID|
    +---------------+--------------------+
    |1-7842-0759-001|CAMID 0000-0002-7...|
    +---------------+--------------------+