代码之家  ›  专栏  ›  技术社区  ›  BAE

spark:如何将行合并到json数组中

  •  0
  • BAE  · 技术社区  · 5 年前

    id1   id2    name   value           epid
    "xxx" "yyy"  "EAN"  "5057723043"    "1299"
    "xxx" "yyy"  "MPN"  "EVBD"          "1299"
    

    我想要:

    {         "id1": "xxx",
              "id2": "yyy",
              "item_specifics": [
                {
                  "name": "EAN",
                  "value": "5057723043"
                },
                {
                  "name": "MPN",
                  "value": "EVBD"
                },
                {
                  "name": "EPID",
                  "value": "1299"
                }
              ]
    }
    

    How to aggregate columns into json array? how to merge rows into column of spark dataframe as vaild json to write it in mysql :

    pi_df.groupBy(col("id1"), col("id2"))
      //.agg(collect_list(to_json(struct(col("name"), col("value"))).alias("item_specifics"))) // => not working
      .agg(collect_list(struct(col("name"),col("value"))).alias("item_specifics"))
    

    { "name":"EAN","value":"5057723043", "EPID": "1299", "id1": "xxx", "id2": "yyy" }
    

    怎么解决这个问题?谢谢

    0 回复  |  直到 5 年前
        1
  •  5
  •   Apurba Pandey    5 年前

    您可以创建两个数据帧,一个以名称和值命名,另一个以epic为名称,以epic值为值,并将它们联合在一起。然后将它们聚合为collect_set并创建一个json。代码应该是这样的。

    //Creating Test Data
    val df = Seq(("xxx","yyy" ,"EAN" ,"5057723043","1299"), ("xxx","yyy" ,"MPN" ,"EVBD", "1299") )
      .toDF("id1", "id2", "name", "value", "epid")
    
    df.show(false)
    
    +---+---+----+----------+----+
    |id1|id2|name|value     |epid|
    +---+---+----+----------+----+
    |xxx|yyy|EAN |5057723043|1299|
    |xxx|yyy|MPN |EVBD      |1299|
    +---+---+----+----------+----+
    
    val df1 = df.withColumn("map", struct(col("name"), col("value")))
      .select("id1", "id2", "map")
    
    val df2 = df.withColumn("map", struct(lit("EPID").as("name"), col("epid").as("value")))
      .select("id1", "id2", "map")
    
    val jsonDF = df1.union(df2).groupBy("id1", "id2")
      .agg(collect_set("map").as("item_specifics"))
      .withColumn("json", to_json(struct("id1", "id2", "item_specifics")))
    
    jsonDF.select("json").show(false)
    
    +---------------------------------------------------------------------------------------------------------------------------------------------+
    |json                                                                                                                                         |
    +---------------------------------------------------------------------------------------------------------------------------------------------+
    |{"id1":"xxx","id2":"yyy","item_specifics":[{"name":"MPN","value":"EVBD"},{"name":"EAN","value":"5057723043"},{"name":"EPID","value":"1299"}]}|
    +---------------------------------------------------------------------------------------------------------------------------------------------+
    

    火花=2.4

    val jsonDF = df.withColumn("map1", struct(col("name"), col("value")))
      .withColumn("map2", struct(lit("epid").as("name"), col("epid").as("value")))
      .groupBy("id1", "id2")
        .agg(collect_set("map1").as("item_specifics1"),
          collect_set("map2").as("item_specifics2"))
      .withColumn("item_specifics", array_union(col("item_specifics1"), col("item_specifics2")))
      .withColumn("json", to_json(struct("id1", "id2", "item_specifics2")))
    
        2
  •  0
  •   ayplam    5 年前

    你很接近。我相信你在找这样的东西:

    val pi_df2 = pi_df.withColumn("name", lit("EPID")).
    withColumnRenamed("epid", "value").
    select("id1", "id2", "name","value")
    
    pi_df.select("id1", "id2", "name","value").
    union(pi_df2).withColumn("item_specific", struct(col("name"), col("value"))).
    groupBy(col("id1"), col("id2")).
    agg(collect_list(col("item_specific")).alias("item_specifics")).
    write.json(...)
    

        3
  •  0
  •   m-bhole    5 年前

    这是你需要做的

        import scala.util.parsing.json.JSONObject
        import scala.collection.mutable.WrappedArray
    
        //Define udf
        val jsonFun = udf((id1 : String, id2 : String, item_specifics: WrappedArray[Map[String, String]], epid: String)=> {
     //Add epid to item_specifics json
    val item_withEPID = item_specifics :+ Map("epid" -> epid)
    
    val item_specificsArray = item_withEPID.map(m => ( Array(Map("name" -> m.keys.toSeq(0), "value" -> m.values.toSeq(0))))).map(m => m.map( mi => JSONObject(mi).toString().replace("\\",""))).flatten.mkString("[",",","]")
    
     //Add id1 and id2 to output json
    val m = Map("id1"-> id1, "id2"-> id2, "item_specifics" -> item_specificsArray.toSeq )
    JSONObject(m).toString().replace("\\","")
    })
    
    val pi_df = Seq( ("xxx","yyy","EAN","5057723043","1299"), ("xxx","yyy","MPN","EVBD","1299")).toDF("id1","id2","name","value","epid")
    
    //Add epid as part of group by column else the column will not be available after group by and aggregation
    val df = pi_df.groupBy(col("id1"), col("id2"), col("epid")).agg(collect_list(map(col("name"), col("value")) as "map").as("item_specifics")).withColumn("item_specifics",jsonFun($"id1",$"id2",$"item_specifics",$"epid"))
    
    df.show(false)
    
    scala> df.show(false)
    +---+---+----+--------------------------------------------------------------------------------------------------------------------------------------------------------------------+
    |id1|id2|epid|item_specifics                                                                                                                                                      |
    +---+---+----+--------------------------------------------------------------------------------------------------------------------------------------------------------------------+
    |xxx|yyy|1299|{"id1" : "xxx", "id2" : "yyy", "item_specifics" : [{"name" : "MPN", "value" : "EVBD"},{"name" : "EAN", "value" : "5057723043"},{"name" : "epid", "value" : "1299"}]}|
    +---+---+----+--------------------------------------------------------------------------------------------------------------------------------------------------------------------+
    

    项目详情栏/输出的内容

    {
        "id1": "xxx",
        "id2": "yyy",
        "item_specifics": [{
            "name": "MPN",
            "value": "EVBD"
        }, {
            "name": "EAN",
            "value": "5057723043"
        }, {
            "name": "epid",
            "value": "1299"
        }]
    }