代码之家  ›  专栏  ›  技术社区  ›  Shashwat

Kafka正在将同一分区分配给同一组中的使用者

  •  0
  • Shashwat  · 技术社区  · 6 年前

    Kafka - Multiple Consumers From Same Group Assigned Same Partition

    我刚刚开始学习卡夫卡和诺德。我已经写了一篇关于消费者的文章如下

    // consumer.js
    const kafka = require('kafka-node');
    var client = new kafka.Client('localhost:2181');
    var topics = [{
        topic: 'topic-4'
    }];
    
    var options = {
        groupId: 'kafka-node-group-2',
        autoCommit: true,
        fetchMaxWaitMs: 1000,
        fetchMaxBytes: 1024 * 1024,
        encoding: 'buffer'
    };
    var consumer = new kafka.HighLevelConsumer(client, topics, options);
    
    // consumer.payloads has only one entry
    console.log('Topic', consumer.payloads[0].topic);
    console.log('Group', consumer.options.groupId);
    console.log('Assigned Partition:', consumer.payloads[0].partition);
    

    输出

    Topic topic-4
    Group kafka-node-group-2
    Assigned Partition: 0
    

    topic-4 有四个分区。

    ./desc_topic.sh topic-4
    Topic:topic-4   PartitionCount:4    ReplicationFactor:1 Configs:
        Topic: topic-4  Partition: 0    Leader: 0   Replicas: 0 Isr: 0
        Topic: topic-4  Partition: 1    Leader: 2   Replicas: 2 Isr: 2
        Topic: topic-4  Partition: 2    Leader: 0   Replicas: 0 Isr: 0
        Topic: topic-4  Partition: 3    Leader: 2   Replicas: 2 Isr: 2
    

    编辑

    我用过 ConsumerGroup

    var options = {
        host: 'localhost:2181',  // zookeeper host omit if connecting directly to broker (see kafkaHost below)
        groupId: 'Group-1',
        sessionTimeout: 15000,
        // // An array of partition assignment protocols ordered by preference.
        // // 'roundrobin' or 'range' string for built ins (see below to pass in custom assignment protocol)
        protocol: ['roundrobin']
    };
    var consumer = new kafka.ConsumerGroup(options, ['topic-4']);
    

    生产者正在发送100条消息,收到的消息如下。这就是我如何知道分配的分区(不是从 consumer 对象)。

    {
        topic: 'topic-4',
        value: '{"subject":"Message Id 30 "}',
        offset: 172,
        partition: 0,
        highWaterOffset: 173,
        key: null
    }
    

    当我运行两个这样的使用者实例(相同的主题和组)时,其中只有一个接收来自分区0的所有内容。这不是一个问题吗?

    这是生产商代码。

    const kafka = require('kafka-node');
    const Client = kafka.Client;
    var client = new Client('localhost:2181', 'my-client-id', {
      sessionTimeout: 300,
      spinDelay: 100,
      retries: 2
    });
    
    // For this demo we just log client errors to the console.
    client.on('error', function(error) {
      console.error(error);
    });
    
    var producer = new kafka.HighLevelProducer(client);
    
    producer.on('ready', function() {
        for (var i = 0; i <= 30; i++) {
            let id = 'Message Id ' + i + ' ';
            let msg = {
                'subject': id
            };
            var messageBuffer = Buffer.from(JSON.stringify(msg));
    
            // Create a new payload
            var payload = [{
                // topic: 'topic-', + (i%2+2),
                topic: 'topic-4',
                messages: messageBuffer,
                timestamp: Date.now(),
                attributes: 1 /* Use GZip compression for the payload */
            }];
    
            //Send payload to Kafka and log result/error
            producer.send(payload, function(error, result) {
                console.info('Sent payload to Kafka: ', payload);
                if (error) {
                    console.error('Error', error);
                } else {
                    var formattedResult = result[0];
                    console.log('result: ', result)
                }
            });
        }
    });
    
    // For this demo we just log producer errors to the console.
    producer.on('error', function(error) {
        console.error(error);
    });
    
    0 回复  |  直到 6 年前