我认为这是一个相关的问题:
Using Kafka KSQL to select all events of a topic from a specific partition with given offset
如何通过KSQL选择/分配分区?我试图阻止KSQL从所有分区读取,因为必要的数据只存在于一个分片中。
例如:
CLI v5.4.1,服务器v5.4.1
SET 'auto.offset.reset'='earliest';
CREATE STREAM SOURCE_STREAM (FIELD_1 BIGINT)
WITH (
VALUE_FORMAT='AVRO',
KAFKA_TOPIC='source_topic',
PARTITIONS=2,
REPLICAS=1
);
插入一些存在于分区0和分区1中的模拟数据(不是真正分配的,但例如)
INSERT INTO SOURCE_STREAM (FIELD_1) VALUES (123); # say in partition 0
INSERT INTO SOURCE_STREAM (FIELD_1) VALUES (456); # say in partition 1
对于消费者API,可以执行以下操作:
consumer.assign(TopicPartition(topic=source_topic, partition=0))
consumer.assign(TopicPartition(topic=source_topic, partition=1))
consumer.get()
然而,对于当前的API,我不确定如何在客户端级别或服务器属性级别“分配”分区。以下派生流将从所有分区读取:
CREATE STREAM DERIVATIVE_STREAM AS
SELECT
FIELD_1
FROM SOURCE_STREAM
EMIT CHANGES;
EXPLAIN CSAS_DERIVATIVE_STREAM_n;
(我知道我可以使用
WHERE
用于筛选数据的语句,但我想显式地从分区0|1读取)