代码之家  ›  专栏  ›  技术社区  ›  M80

如何将from_json与模式作为字符串(即json编码的模式)一起使用?

  •  10
  • M80  · 技术社区  · 7 年前

    from_json 具有采用类型架构的变体 String ,但我找不到样本。请告知以下代码中的错误。

    错误

    Exception in thread "main" org.apache.spark.sql.catalyst.parser.ParseException: 
    extraneous input '(' expecting {'SELECT', 'FROM', 'ADD', 'AS', 'ALL', 'DISTINCT',
    
    == SQL ==
    STRUCT ( `firstName`: STRING, `lastName`: STRING, `email`: STRING, `addresses`: ARRAY ( STRUCT ( `city`: STRING, `state`: STRING, `zip`: STRING )  )  ) 
    -------^^^
    
    at org.apache.spark.sql.catalyst.parser.ParseException.withCommand(ParseDriver.scala:217)
    

    public static void main(String[] args) throws AnalysisException {
        String master = "local[*]";
        String brokers = "quickstart:9092";
        String topics = "simple_topic_6";
    
        SparkSession sparkSession = SparkSession
                .builder().appName(EmployeeSchemaLoader.class.getName())
                .master(master).getOrCreate();
    
       String employeeSchema = "STRUCT ( firstName: STRING, lastName: STRING, email: STRING, " +
                "addresses: ARRAY ( STRUCT ( city: STRING, state: STRING, zip: STRING )  )  ) ";
    
        SparkContext context = sparkSession.sparkContext();
        context.setLogLevel("ERROR");
        SQLContext sqlCtx = sparkSession.sqlContext();
    
        Dataset<Row> employeeDataset = sparkSession.readStream().
                format("kafka").
                option("kafka.bootstrap.servers", brokers)
                .option("subscribe", topics).load();
        employeeDataset.printSchema();
        employeeDataset = employeeDataset.withColumn("strValue", employeeDataset.col("value").cast("string"));
        employeeDataset = employeeDataset.withColumn("employeeRecord",
                functions.from_json(employeeDataset.col("strValue"),employeeSchema, new HashMap<>()));
    
        employeeDataset.printSchema();
        employeeDataset.createOrReplaceTempView("employeeView");
    
        sparkSession.catalog().listTables().show();
    
        sqlCtx.sql("select * from employeeView").show();
    }
    
    1 回复  |  直到 6 年前
        1
  •  14
  •   Jacek Laskowski    7 年前

    你的问题帮助我发现 from_json 具有 String recently 在即将到来的2.3.0中,添加到Scala的Spark API中。我一直坚信Scala的Spark API始终是功能最丰富的,你的问题帮助我了解到,在2.3.0(!)的变化之前,它本不应该如此

    回到你的问题,实际上可以用JSON或DDL格式定义基于字符串的模式。

    import org.apache.spark.sql.types._
    val addressesSchema = new StructType()
      .add($"city".string)
      .add($"state".string)
      .add($"zip".string)
    val schema = new StructType()
      .add($"firstName".string)
      .add($"lastName".string)
      .add($"email".string)
      .add($"addresses".array(addressesSchema))
    scala> schema.printTreeString
    root
     |-- firstName: string (nullable = true)
     |-- lastName: string (nullable = true)
     |-- email: string (nullable = true)
     |-- addresses: array (nullable = true)
     |    |-- element: struct (containsNull = true)
     |    |    |-- city: string (nullable = true)
     |    |    |-- state: string (nullable = true)
     |    |    |-- zip: string (nullable = true)
    

    这似乎符合你的模式,不是吗?

    这样一来,将模式转换为JSON编码的字符串就轻而易举了 json

    val schemaAsJson = schema.json
    

    schemaAsJson 就是你的JSON字符串,看起来很漂亮。。。六羟甲基三聚氰胺六甲醚。。。复杂的出于显示目的,我宁愿使用 prettyJson 方法

    scala> println(schema.prettyJson)
    {
      "type" : "struct",
      "fields" : [ {
        "name" : "firstName",
        "type" : "string",
        "nullable" : true,
        "metadata" : { }
      }, {
        "name" : "lastName",
        "type" : "string",
        "nullable" : true,
        "metadata" : { }
      }, {
        "name" : "email",
        "type" : "string",
        "nullable" : true,
        "metadata" : { }
      }, {
        "name" : "addresses",
        "type" : {
          "type" : "array",
          "elementType" : {
            "type" : "struct",
            "fields" : [ {
              "name" : "city",
              "type" : "string",
              "nullable" : true,
              "metadata" : { }
            }, {
              "name" : "state",
              "type" : "string",
              "nullable" : true,
              "metadata" : { }
            }, {
              "name" : "zip",
              "type" : "string",
              "nullable" : true,
              "metadata" : { }
            } ]
          },
          "containsNull" : true
        },
        "nullable" : true,
        "metadata" : { }
      } ]
    }
    

    这是JSON中的模式。

    DataType 并“验证”JSON字符串(使用 DataType.fromJson 来自_json

    import org.apache.spark.sql.types.DataType
    val dt = DataType.fromJson(schemaAsJson)
    scala> println(dt.sql)
    STRUCT<`firstName`: STRING, `lastName`: STRING, `email`: STRING, `addresses`: ARRAY<STRUCT<`city`: STRING, `state`: STRING, `zip`: STRING>>>
    

    一切似乎都很好。介意我用样本数据集检查一下吗?

    val rawJsons = Seq("""
      {
        "firstName" : "Jacek",
        "lastName" : "Laskowski",
        "email" : "jacek@japila.pl",
        "addresses" : [
          {
            "city" : "Warsaw",
            "state" : "N/A",
            "zip" : "02-791"
          }
        ]
      }
    """).toDF("rawjson")
    val people = rawJsons
      .select(from_json($"rawjson", schemaAsJson, Map.empty[String, String]) as "json")
      .select("json.*") // <-- flatten the struct field
      .withColumn("address", explode($"addresses")) // <-- explode the array field
      .drop("addresses")  // <-- no longer needed
      .select("firstName", "lastName", "email", "address.*") // <-- flatten the struct field
    scala> people.show
    +---------+---------+---------------+------+-----+------+
    |firstName| lastName|          email|  city|state|   zip|
    +---------+---------+---------------+------+-----+------+
    |    Jacek|Laskowski|jacek@japila.pl|Warsaw|  N/A|02-791|
    +---------+---------+---------------+------+-----+------+