代码之家  ›  专栏  ›  技术社区  ›  Michal FaÅ¡ánek

PykaFaA消费者与终端消费者的冲突

  •  1
  • Michal FaÅ¡ánek  · 技术社区  · 6 年前

    我想实现一个分布式微服务架构上的消息传递的kafka。

    我正在使用 皮卡夫卡 实现了虚拟生产者和(平衡的)消费者。我把所有的消费者分配给 同一消费群体 . 我可以使用 python和控制台 同时,甚至运行时添加它们。

    不过,我对消费者有意见。我可以创建多个Python用户,甚至运行时添加它们。但是当我将控制台用户(卡夫卡控制台用户)添加到Python消费者的组时,我得到了互斥错误:

    从用户ID 'BiMalHalsMaBooBo.Pro。本地:1722EEA073-4BE4-9D978B7FB15B0B30B'(错误:{ PykaFak.Exist.unNeNeMeNid ID::(0, 1)})提交主题“B'MigalySimPayPosiple主题”偏移的错误

    此外,这两个(即使它们属于同一个消费群体)正在消耗消息(Python消费者在它们自己和控制台消费者之间平衡它们)。

    现在,我对卡夫卡是新的,但我的第一印象是,卡夫卡不应该对消费者的实施不可知,所以结合他们应该是可能的。我的理解是PykaFKA还是PikaFKA的实现?

    生产者:

    from pykafka import KafkaClient
    from time import sleep
    
    client = KafkaClient(hosts="localhost:9092")
    
    print(client.brokers)
    print(client.topics)
    
    topic = client.topics[b'michal_sample_topic']
    
    with topic.get_sync_producer() as producer:
    
    while True:
        producer.produce(
            bytes(
                input('Send test message:'), 
                'utf-8'
            )
        )
    

    消费者:

    from pykafka import KafkaClient
    
    client = KafkaClient(hosts="localhost:9092")
    
    print(client.brokers)
    print(client.topics)
    
    topic = client.topics[b'michal_sample_topic']
    
    balanced_consumer = topic.get_balanced_consumer(
        consumer_group=b'testing',
        auto_commit_enable=True,
        zookeeper_connect='localhost:2181'
    )
    
    for message in balanced_consumer:
        if message is not None:
            print(f'{message.offset} {message.value}')
    
    0 回复  |  直到 6 年前