代码之家  ›  专栏  ›  技术社区  ›  Shakeel

在KSQL中使用“group by”时,如何获取分区键或其他列?

  •  0
  • Shakeel  · 技术社区  · 6 年前

    group by 我的查询中的表达式。

    从主题创建流

    CREATE STREAM events_stream \
          ( \
         account VARCHAR, \
         event_id VARCHAR, \
         user_name VARCHAR, \
         event_name VARCHAR, \
         source VARCHAR, \
         message VARCHAR, \
         timestamp STRUCT<iMillis INTEGER>) \
        WITH (KAFKA_TOPIC='console_failure', VALUE_FORMAT='JSON');
    

    从上面的流创建表。

    ksql> CREATE TABLE events_table AS \
          SELECT source, count(*) \
          FROM events_stream \
          WINDOW TUMBLING (SIZE 60 SECONDS) \
          WHERE account = '1111111111' \
                      GROUP BY source \
                      HAVING count(*) > 3;
    

    生成此消息4次。

    ip="10.10.10.10"
    
    data = {
            "account": "1111111111",
            "event_id": "4cdabe46-690d-494a-a37e-6e455781d8b4",
            "user_name": "shakeel",
            "event_name": "some_event",
            "source": "127.0.0.1",
            "message": "message related to event",
            "timestamp": {
                "iMillis": 1547543309000
                 }
            }
    
    producer.send('console_failure', key='event_json', value=dict(data)
    

    这一切如期而至!

    ksql> select * from events_table;
    1550495772262 | 10.10.10.10 : Window{start=1550495760000 end=-} | 10.10.10.10 | 4
    ksql> 
    

    使用后,我了解可能是我们在使用时无法获取其他列 分组依据 陈述。

    ksql> CREATE TABLE events_table1 AS \
    >      SELECT source, event_id, \
    >               count(*) \
    >     FROM events_stream \
    >     WINDOW TUMBLING (SIZE 60 SECONDS) \
    >      WHERE account = '1111111111' \
    >                  GROUP BY source \
    >                  HAVING count(*) > 3;
    Group by elements should match the SELECT expressions.
    ksql>
    

    我们可以通过重新设置流的密钥来实现这一点吗?

    阅读后 this event_id 字段,但不确定如何在 陈述。

    ksql> CREATE STREAM events_stream_rekey AS SELECT * FROM events_stream PARTITION BY event_id;
    
     Message
    ----------------------------
     Stream created and running
    ----------------------------
    ksql>
    ksql> SELECT ROWKEY, EVENT_ID FROM events_stream_rekey;
    4cdabe46-690d-494a-a37e-6e455781d8b4 | 4cdabe46-690d-494a-a37e-6e455781d8b4
    ksql>
    
    ksql> CREATE TABLE  events_table2 AS \
    >      SELECT source, \
    >               count(*), \
    >     WITH (KAFKA_TOPIC='EVENTS_STREAM_REKEY', VALUE_FORMAT='JSON', KEY='event_id'),
    >     WINDOW TUMBLING (SIZE 60 SECONDS) \
    >      WHERE account = '1111111111' \
    >                  GROUP BY source \
    >                  HAVING count(*) > 3;
    line 1:70: extraneous input 'WITH' expecting {'(', 'NOT', 'NO', 'NULL', 'TRUE', 'FALSE', 'INTEGER', 'DATE', 'TIME', 'TIMESTAMP', 'INTERVAL', 'YEAR', 'MONTH', 'DAY', 'HOUR', 'MINUTE', 'SECOND', 'ZONE', 'CASE', 'PARTITION', 'STRUCT', 'REPLACE', 'EXPLAIN', 'ANALYZE', 'FORMAT', 'TYPE', 'TEXT', 'CAST', 'SHOW', 'TABLES', 'SCHEMAS', 'COLUMNS', 'COLUMN', 'PARTITIONS', 'FUNCTIONS', 'FUNCTION', 'ARRAY', 'MAP', 'SET', 'RESET', 'SESSION', 'DATA', 'IF', '+', '-', '*', STRING, BINARY_LITERAL, INTEGER_VALUE, DECIMAL_VALUE, IDENTIFIER, DIGIT_IDENTIFIER, QUOTED_IDENTIFIER, BACKQUOTED_IDENTIFIER}
    
    
    

    KSQL版本详细信息:CLI v5.1.0,服务器v5.1.0

    --------------------------复制步骤--------------------------

    制片人剧本

    import time
    import uuid
    from kafka import KafkaProducer
    from json import dumps
    
    producer = KafkaProducer(bootstrap_servers=['localhost:9092'],
                             value_serializer=lambda x:
                             dumps(x).encode('utf-8'))
    
    for i in range(1, 5):
        time.sleep(1)
        data = {
            "account": "1111111111",
            "event_id": str(uuid.uuid4()),
            "user_name": "user_{0}".format(i),
            "event_name": "event_{0}".format(i),
            "source": "10.0.9.1",
            "message": "message related to event {0}".format(i),
            "timestamp": {
                "iMillis": int(round(time.time() * 1000))
            }
        }
        time.sleep(2)
        producer.send('testing_topic', value=data)
    

    打开 从测试主题(使用普通消费者脚本)。

    {'account': '1111111111', 'event_id': 'c186ba8a-2402-428a-a5d8-c5b8279e14af', 'user_name': 'user_1', 'event_name': 'event_1', 'source': '10.0.9.1', 'message': 'message related to event 1', 'timestamp': {'iMillis': 1551296878444}}
    {'account': '1111111111', 'event_id': '4c45bff7-eb40-48a8-9972-301ad24af9ca', 'user_name': 'user_2', 'event_name': 'event_2', 'source': '10.0.9.1', 'message': 'message related to event 2', 'timestamp': {'iMillis': 1551296881465}}
    {'account': '1111111111', 'event_id': '4ee14303-e6d1-4847-ae3d-22b49b3ce6eb', 'user_name': 'user_3', 'event_name': 'event_3', 'source': '10.0.9.1', 'message': 'message related to event 3', 'timestamp': {'iMillis': 1551296884469}}
    {'account': '1111111111', 'event_id': '3c196ac5-9559-4269-bf51-95b78ce4ffcb', 'user_name': 'user_4', 'event_name': 'event_4', 'source': '10.0.9.1', 'message': 'message related to event 4', 'timestamp': {'iMillis': 1551296887472}}
    

    预期结果 :如果消息包含相同的 source 在窗口时间后30秒内的地址 account 然后我想马上完成 消息(我的案例中的第四条消息如下所示)。使用KSQL可以实现这一点吗?

    {'account': '1111111111', 'event_id': '3c196ac5-9559-4269-bf51-95b78ce4ffcb', 'user_name': 'user_4', 'event_name': 'event_4', 'source': '10.0.9.1', 'message': 'message related to event 4', 'timestamp': {'iMillis': 1551296887472}}
    
    0 回复  |  直到 6 年前
        1
  •  0
  •   Andrew Coates    6 年前

    除了罗宾的回答,这个错误:

    line 1:70: extraneous input 'WITH' expecting {'(', 'NOT', 'NO', 'NULL', 'TRUE', 'FALSE', 'INTEGER', 'DATE', 'TIME', 'TIMESTAMP', 'INTERVAL', 'YEAR', 'MONTH', 'DAY', 'HOUR', 'MINUTE', 'SECOND', 'ZONE', 'CASE', 'PARTITION', 'STRUCT', 'REPLACE', 'EXPLAIN', 'ANALYZE', 'FORMAT', 'TYPE', 'TEXT', 'CAST', 'SHOW', 'TABLES', 'SCHEMAS', 'COLUMNS', 'COLUMN', 'PARTITIONS', 'FUNCTIONS', 'FUNCTION', 'ARRAY', 'MAP', 'SET', 'RESET', 'SESSION', 'DATA', 'IF', '+', '-', '*', STRING, BINARY_LITERAL, INTEGER_VALUE, DECIMAL_VALUE, IDENTIFIER, DIGIT_IDENTIFIER, QUOTED_IDENTIFIER, BACKQUOTED_IDENTIFIER}
    

    是指你的WITH子句放错了地方。正确的模式是:

    CREATE TABLE <table name> WITH(...) AS SELECT ...
    

    你的陈述如下:

    ksql> CREATE TABLE events_table2
    >     WITH (KAFKA_TOPIC='EVENTS_STREAM_REKEY', VALUE_FORMAT='JSON', KEY='event_id'),
    >     AS
    >     SELECT source, count(*),
    >     WINDOW TUMBLING (SIZE 60 SECONDS)
    >      WHERE account = '1111111111'
    >                  GROUP BY source
    >                  HAVING count(*) > 3;
    
        2
  •  0
  •   Robin Moffatt    6 年前

    消息本身实际上告诉了您问题:)

    Group by元素应该与SELECT表达式匹配。

    source 在里面 二者都 SELECT GROUP BY :

    ksql> SELECT source, count(*) \
    >      FROM events_stream \
    >      WINDOW TUMBLING (SIZE 60 SECONDS) \
    >      WHERE account = '1111111111' \
    >                  GROUP BY source \
    >                  HAVING count(*) > 3;
    127.0.0.1 | 4
    ^CQuery terminated
    

    若要添加其他列,请确保将它们添加到 也:

    ksql> SELECT source, event_id, count(*) \
    >      FROM events_stream \
    >      WINDOW TUMBLING (SIZE 60 SECONDS) \
    >      WHERE account = '1111111111' \
    >                  GROUP BY source, event_id \
    >                  HAVING count(*) > 3;
    127.0.0.1 | 4cdabe46-690d-494a-a37e-6e455781d8b4 | 4
    

    编辑以回答更新的问题

    我认为在SQL(或KSQL)中不可能[容易]做到这一点。您可以通过在聚合操作中包含时间戳来实现类似的功能,例如:

    CREATE TABLE source_alert AS \
    SELECT source, COUNT(*), MAX(timestamp) \
    FROM event_stream WINDOW TUMBLING (SIZE 60 SECONDS) \
    GROUP BY `source` \
    HAVING COUNT(*)>1
    

    然后获取结果表并连接到事件流:

    SELECT * \
     FROM event_stream e \
          INNER JOIN \
          source_alert a ON e.source=a.source \
    WHERE e.timestamp=a.timestamp