代码之家  ›  专栏  ›  技术社区  ›  Surender Raja

Json解析在Spark UDF中抛出意外输出

  •  1
  • Surender Raja  · 技术社区  · 7 年前

    该数据帧中所有列的数据类型都是字符串。其中一些列是jsonString

     +--------+---------+--------------------------+
     |event_id|event_key|              rights      |
     +--------+---------+--------------------------+
     |     410|(default)|{"conditions":[{"devic...|
     +--------+---------+--------------------------+
    

    以下是“权利”的价值

     {
    "conditions": [
        {
            "devices": [
                {
                    "connection": [
                        "BROADBAND",
                        "MOBILE"
                    ],
                    "platform": "IOS",
                    "type": "MOBILE",
                    "provider": "TELETV"
                },
                {
                    "connection": [
                        "BROADBAND",
                        "MOBILE"
                    ],
                    "platform": "ANDROID",
                    "type": "MOBILE",
                    "provider": "TELETV"
                },
                {
                    "connection": [
                        "BROADBAND",
                        "MOBILE"
                    ],
                    "platform": "IOS",
                    "type": "TABLET",
                    "provider": "TELETV"
                },
                {
                    "connection": [
                        "BROADBAND",
                        "MOBILE"
                    ],
                    "platform": "ANDROID",
                    "type": "TABLET",
                    "provider": "TELETV"
                }
            ],
            "endDateTime": "2017-01-09T22:59:59.000Z",
            "inclusiveGeoTerritories": [
                "DE",
                "IT",
                "ZZ"
            ],
            "mediaType": "Linear",
            "offers": [
                {
                    "endDateTime": "2017-01-09T22:59:59.000Z",
                    "isRestartable": true,
                    "isRecordable": true,
                    "isCUTVable": false,
                    "recordingMode": "UNIQUE",
                    "retentionCUTV": "P7DT2H",
                    "retentionNPVR": "P2Y6M5DT12H35M30S",
                    "offerId": "MOTOGP-RACE",
                    "offerType": "IPPV",
                    "startDateTime": "2017-01-09T17:00:00.000Z"
                }
            ],
            "platformName": "USA",
            "startDateTime": "2017-01-09T17:00:00.000Z",
            "territory": "USA"
        }
     ]
    }
    

    现在,我想在现有数据帧中创建一个新列。要添加的新列的名称为“provider”

     conditions -> devices -> provider
    

    以字符串形式返回值

     import org.apache.spark.sql.functions.udf
     import org.apache.spark.sql.functions._
     import org.json4s._
     import org.json4s.jackson.JsonMethods
     import org.json4s.jackson.JsonMethods._
    
    
      //
         some codes to derive base dataframe
      //
    
      val fetchProvider_udf = udf(fetchProvider _)
      val result = df.withColumn("provider",fetchProvider_udf(col("rights")))
       result.select("event_id","event_key","rights","provider").show(10)
    
    
      def fetchProvider(jsonStr:String): String = {
    
        val json = JsonMethods.parse(jsonStr)
    
       val providerData = json \\ "conditions" \\"devices" \\ "provider"
    
       compact(render(providerData))
      }
    

    还有,如果导航键不可用,我该如何处理?它是否引发异常?假设有“条件”和“设备”,但json字符串中没有“提供者”键。那我该怎么办?

     +--------+---------+-----------------------+-------------+
     |event_id|event_key|              rights     |provider     |
     +--------+---------+-----------------------+-------------+
     |     410|(unknown)|{"conditions":[{"devic...|    TELETV    |
     +--------+---------+-----------------------+-------------+
    

    但我得到以下输出

     +--------+---------+-----------------------+-------------------------------     ------------------------------------------------------+
     |event_id|event_key|              rights        |                                                     provider     |
          +--------+---------+-----------------------+--------------------------      -----------------------------------------------------------+
     |     410|(unknown)|{"conditions":[{"devic...|    {"provider":"TELETV","provider":"TELETV","provider":"TELETV","provider":"TELETV"      }   |
       +--------+---------+-----------------------+-----------------------------       --------------------------------------------------------+
    
    1 回复  |  直到 5 年前
        1
  •  0
  •   Mariusz    7 年前

    如果要提取第一个提供者的值,应在UDF中使用以下代码:

    (json \\ "conditions" \\"devices")[0] \\ "provider"
    

    当前代码只获取所有提供者(作为映射),然后将其转换为字符串作为UDF结果。

    您还应该确保UDF不会引发任何异常(因为它会导致整个作业失败)。最简单的方法是返回null,然后:

    • df.provider.isNull()
    • 如果只想保留有效条目-按筛选 df.provider.isNullNull()