代码之家  ›  专栏  ›  技术社区  ›  Guy Segal

试图在卡夫卡建立消费者和生产者

  •  0
  • Guy Segal  · 技术社区  · 6 年前

    我试图让一个简单的生产者-消费者流通过卡夫卡,使用 node-rdkafka

    debug: 'all' 模式,这是我从日志中得到的:

    test [0]: MessageSet with 1 message(s) delivered

    消费者: Fetch topic test [0] at offset 38 (v2)

    当生成消息时,使用者正在更改偏移量这一事实使我相信与代理的连接已正确设置和验证。

    从不调用此事件:

    consumer.on('data', function(m) {
        console.log("consumed", m)
    });
    

    我创建了一个用于测试的演示项目,您需要一个支持SASL\u SSL协议的Kafka代理才能使用它:

    https://github.com/guysegal/kafka-example

    具体来说,这是消费者代码:

    https://github.com/guysegal/kafka-example/blob/master/src/consumer.ts

    生产商代码:

    https://github.com/guysegal/kafka-example/blob/master/src/producer.ts

    1 回复  |  直到 6 年前
        1
  •  0
  •   bittu    6 年前

    您可能需要设置值 'auto.offset.reset': 'earliest' 再开始消费。

    auto.offset.reset 
    

    如果使用者从没有最后一个已知状态的组开始,则此属性决定使用者的开始位置 . 例如,对于新的组id。

    改变价值的原因 auto.offset.reset 因为如果您首先生成,然后启动consumer,那么主题偏移量已经增加,consumer将从最新偏移量(增加的偏移量)开始,并从该点开始读取消息。

    'earliest' 消费者从主题的第一条可用消息开始。