代码之家  ›  专栏  ›  技术社区  ›  ethrbunny

ksql-从json数组创建流

  •  0
  • ethrbunny  · 技术社区  · 6 年前

    我的卡夫卡主题是以这种格式推送数据(来自 收藏 )以下内容:

    [{"values":[100.000080140372],"dstypes":["derive"],"dsnames":["value"],"time":1529970061.145,"interval":10.000,"host":"k5.orch","plugin":"cpu","plugin_instance":"23","type":"cpu","type_instance":"idle","meta":{"network:received":true}}]
    

    它是数组、int和float的组合…整个过程都在一个json数组中。结果我花了很多时间 ksql公司 对这些数据做任何事情。

    当我创建“默认”流时

    create stream cd_temp with (kafka_topic='ctd_test', value_format='json');
    

    我得到这个结果:

    ksql> describe cd_temp;
    
     Field   | Type                      
    -------------------------------------
     ROWTIME | BIGINT           (system) 
     ROWKEY  | VARCHAR(STRING)  (system) 
    -------------------------------------
    

    任何 选择 将返回rowtime和rowkey的8位十六进制值。

    我花了一些时间试图提取json字段,但没有成功。我担心的是:

    ksql> print 'ctd_test' from beginning;
    Format:JSON
    com.fasterxml.jackson.databind.node.ArrayNode cannot be cast to com.fasterxml.jackson.databind.node.ObjectNode
    

    有没有可能这个话题不能用在 ksql公司 是吗?有没有一种技术可以将外部数组解包到内部有趣的位?

    1 回复  |  直到 6 年前
        1
  •  3
  •   Andrew Coates    6 年前

    在编写本文时(2018年6月),ksql无法处理json消息,因为整个消息都嵌入到顶级数组中。有一个 github issue to track this .我建议在这个问题上增加1+1的投票权,以提高优先权。

    另外,我注意到create stream语句没有定义json消息的模式。虽然这在这种情况下没有帮助,但对于其他json输入格式,您需要这样做,即您创建的语句应该如下所示:

    create stream cd_temp (values ARRAY<DOUBLE>, dstypes ARRAY<VARCHAR>, etc) with (kafka_topic='ctd_test', value_format='json');