代码之家  ›  专栏  ›  技术社区  ›  zimvak

如何在KSQL中选择/分配分区

  •  0
  • zimvak  · 技术社区  · 4 年前

    我认为这是一个相关的问题: Using Kafka KSQL to select all events of a topic from a specific partition with given offset

    如何通过KSQL选择/分配分区?我试图阻止KSQL从所有分区读取,因为必要的数据只存在于一个分片中。

    例如:

    CLI v5.4.1,服务器v5.4.1

    SET 'auto.offset.reset'='earliest';
    CREATE STREAM SOURCE_STREAM (FIELD_1 BIGINT)
        WITH (
            VALUE_FORMAT='AVRO',
            KAFKA_TOPIC='source_topic',
            PARTITIONS=2,
            REPLICAS=1
        );
    

    插入一些存在于分区0和分区1中的模拟数据(不是真正分配的,但例如)

    INSERT INTO SOURCE_STREAM (FIELD_1) VALUES (123);  # say in partition 0
    INSERT INTO SOURCE_STREAM (FIELD_1) VALUES (456);  # say in partition 1
    

    对于消费者API,可以执行以下操作:

    consumer.assign(TopicPartition(topic=source_topic, partition=0))
    consumer.assign(TopicPartition(topic=source_topic, partition=1))
    consumer.get()
    

    然而,对于当前的API,我不确定如何在客户端级别或服务器属性级别“分配”分区。以下派生流将从所有分区读取:

    CREATE STREAM DERIVATIVE_STREAM AS 
        SELECT
            FIELD_1
        FROM SOURCE_STREAM
        EMIT CHANGES;
    
    EXPLAIN CSAS_DERIVATIVE_STREAM_n;
    

    (我知道我可以使用 WHERE 用于筛选数据的语句,但我想显式地从分区0|1读取)

    0 回复  |  直到 4 年前
        1
  •  1
  •   Robin Moffatt    4 年前

    ksqlDB不是这样工作的。您可以使用SQL来声明 什么 你想要,不是 怎样 你想要它。

    正如你在问题中所说,你可以使用 WHERE 要将谓词应用于查询,可以使用 ROWKEY 以针对消息键值。

    我想RDBMS世界中的并行将是基于成本的优化器的执行计划提示。

    如果您想将此作为增强请求记录到ksqlDB,请在此处执行: https://github.com/confluentinc/ksql/issues/new

    推荐文章