代码之家  ›  专栏  ›  技术社区  ›  Mr_and_Mrs_D

python kafka:在发布新消息之前,是否有方法阻止消费者关注kafka主题?

  •  2
  • Mr_and_Mrs_D  · 技术社区  · 6 年前

    我有一个消费者订阅了一个测试主题,其中生产者线程定期发布。我希望能够阻止消费者线程,直到打开新消息,然后处理该消息并再次开始等待。我最接近的是:

    consumer = KafkaConsumer(topic_name, auto_offset_reset='latest',
                             bootstrap_servers=[localhost_],
                             api_version=(0, 10), consumer_timeout_ms=1000)
    while True:
        print(consumer.poll(timeout_ms=5000))
    

    有没有更惯用的方法(或者这种方法有什么我看不到的严重问题)?

    新来的卡夫卡对这种设计的一般建议非常欢迎。完整(运行)示例:

    import time
    from threading import Thread
    
    import kafka
    from kafka import KafkaProducer, KafkaConsumer
    
    print('python-kafka:', kafka.__version__)
    
    def publish_message(producer_instance, topic_name, key, value):
        try:
            key_bytes = bytes(str(key), encoding='utf-8')
            value_bytes = bytes(str(value), encoding='utf-8')
            producer_instance.send(topic_name, key=key_bytes, value=value_bytes)
            producer_instance.flush()
        except Exception as ex:
            print('Exception in publishing message\n', ex)
    
    localhost_ = 'localhost:9092'
    
    def kafka_producer():
        _producer = None
        try:
            _producer = KafkaProducer(bootstrap_servers=[localhost_],
                                      api_version=(0, 10))
        except Exception as ex:
            print('Exception while connecting Kafka')
            print(str(ex))
        j = 0
        while True:
            publish_message(_producer, topic_name, value=j, key=j)
            j += 1
            time.sleep(5)
    
    if __name__ == '__main__':
        print('Running Producer..')
        topic_name = 'test'
        prod_thread = Thread(target=kafka_producer)
        prod_thread.start()
        consumer = KafkaConsumer(topic_name, auto_offset_reset='latest',
                                 bootstrap_servers=[localhost_],
                                 api_version=(0, 10), consumer_timeout_ms=1000)
        # consumer.subscribe([topic_name])
        while True:
            print(consumer.poll(timeout_ms=5000))
    

    python-kafka: 1.3.5

    1 回复  |  直到 6 年前
        1
  •  3
  •   zsltg    6 年前

    无限循环中的轮询是 Kafka: The Definitive Guide 也。这是一个来自Java的摘录 Chapter 4. Kafka Consumers: Reading Data from Kafka 使用相同的想法:

    try {
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(100);
            ...
        }
    }
    

    这很好地解释了如何建议在Python中使用库。

    卡夫卡蟒蛇 (请参阅中的完整示例 A Tale of Two Kafka Clients )

    from kafka import KafkaConsumer
    ...
    kafka_consumer = Consumer(
    ...
    )
    consumer.subscribe([topic])
    
    running = True
    while running:
        message = kafka_consumer.poll()
    ...
    

    融合卡夫卡巨蟒 (请参阅中的完整示例 Introduction to Apache Kafka for Python Programmers )

    from confluent_kafka import Consumer, KafkaError
    ...
    c = Consumer(settings)
    
    c.subscribe(['mytopic'])
    
    try:
        while True:
            msg = c.poll(0.1)
    ...
    

    可能出现的另一个紧密相关的问题是如何处理消息。

    这部分代码可能依赖于外部依赖项(数据库、远程服务、网络文件系统等),这可能导致处理尝试失败。

    因此,实现重试逻辑可能是个好主意,您可以在博客文章中找到一个关于重试逻辑的良好描述。 Retrying consumer architecture in the Apache Kafka .