代码之家  ›  专栏  ›  技术社区  ›  ramazan polat

使用ClickHouse从Kafka消费嵌套JSON消息

  •  0
  • ramazan polat  · 技术社区  · 4 年前

    Clickhouse绝对可以从Kafka读取JSON消息,如果它们是扁平的JSON文档。

    我们用以下方式表明这一点 kafka_format = 'JSONEachRow' 在Clickhouse。

    这是我们目前使用它的方式:

    CREATE TABLE topic1_kafka
    (
        ts Int64,
        event String,
        title String,
        msg String
    ) ENGINE = Kafka
    SETTINGS kafka_broker_list = 'kafka1test.intra:9092,kafka2test.intra:9092,kafka3test.intra:9092',
    kafka_topic_list = 'topic1', kafka_num_consumers = 1, kafka_group_name = 'ch1', 
    kafka_format = 'JSONEachRow'
    

    只要生产者将扁平JSON发送到 topic1_kafka 。但并非所有生产者都发送平面JSON,大多数应用程序生成嵌套的JSON文档,如下所示:

    {
      "ts": 1598033988,
      "deviceId": "cf060111-dbe6-4aa8-a2d0-d5aa17f45663",
      "location": [39.920515, 32.853708],
      "stats": {
        "temp": 71.2,
        "total_memory": 32,
        "used_memory": 21.2
      }
    }
    

    不幸的是,上面的JSON文档与 JSONEachRow ,因此ClickHouse无法将JSON文档中的字段映射到表中的列。

    有什么方法可以做这个映射吗?

    编辑 :我们想将嵌套的json映射到一个平面表,如下所示:

    CREATE TABLE topic1
    (
        ts Int64,
        deviceId String,
        location_1 Float64,
        location_2 Float64,
        stats_temp Float64,
        stats_total_memory Float64,
        stats_used_memory Float64
    ) ENGINE = MergeTree()
    
    0 回复  |  直到 4 年前
        1
  •  5
  •   vladimir    4 年前

    看起来一种方法是将“原始”数据作为字符串获取,然后使用 JSON functions 在消费者物化视角下。

    WITH '{"ts": 1598033988, "deviceId": "cf060111-dbe6-4aa8-a2d0-d5aa17f45663", "location": [39.920515, 32.853708], "stats": { "temp": 71.2, "total_memory": 32, "used_memory": 21.2 }}' AS raw
    SELECT 
      JSONExtractUInt(raw, 'ts') AS ts,
      JSONExtractString(raw, 'deviceId') AS deviceId,
      arrayMap(x -> toFloat32(x), JSONExtractArrayRaw(raw, 'location')) AS location,
      JSONExtract(raw, 'stats', 'Tuple(temp Float64, total_memory Float64, used_memory Float64)') AS stats,
      stats.1 AS temp,
      stats.2 AS total_memory,
      stats.3 AS used_memory;
    
    /*
    ┌─────────ts─┬─deviceId─────────────────────────────┬─location──────────────┬─stats────────────────────────┬─temp─┬─total_memory─┬────────used_memory─┐
    │ 1598033988 │ cf060111-dbe6-4aa8-a2d0-d5aa17f45663 │ [39.920513,32.853706] │ (71.2,32,21.200000000000003) │ 71.2 │           32 │ 21.200000000000003 │
    └────────────┴──────────────────────────────────────┴───────────────────────┴──────────────────────────────┴──────┴──────────────┴────────────────────┘
    */
    

    备注:对于带浮点数的数字,应使用类型 浮点数64 浮子32 (见相关 CH Issue 13962 ).


    使用标准数据类型需要更改JSON的模式:

    1. 代表 统计数据 作为 Tuple
    CREATE TABLE test_tuple_field
    (
        ts Int64,
        deviceId String,
        location Array(Float32),
        stats Tuple(Float32, Float32, Float32)
    ) ENGINE = MergeTree()
    ORDER BY ts;
    
    
    INSERT INTO test_tuple_field FORMAT JSONEachRow 
    { "ts": 1598033988, "deviceId": "cf060111-dbe6-4aa8-a2d0-d5aa17f45663", "location": [39.920515, 32.853708], "stats": [71.2, 32, 21.2]};
    
    1. 代表 统计数据 作为 Nested Structure
    CREATE TABLE test_nested_field
    (
        ts Int64,
        deviceId String,
        location Array(Float32),
        stats Nested (temp Float32, total_memory Float32, used_memory Float32)
    ) ENGINE = MergeTree()
    ORDER BY ts;
    
    
    SET input_format_import_nested_json=1;
    INSERT INTO test_nested_field FORMAT JSONEachRow 
    { "ts": 1598033988, "deviceId": "cf060111-dbe6-4aa8-a2d0-d5aa17f45663", "location": [39.920515, 32.853708], "stats": { "temp": [71.2], "total_memory": [32], "used_memory": [21.2] }};
    

    查看相关答案 ClickHouse JSON parse exception: Cannot parse input: expected ',' before .