代码之家  ›  专栏  ›  技术社区  ›  EMP

如何使用py amqplib在多个队列上等待消息

  •  3
  • EMP  · 技术社区  · 15 年前

    我在用 py amqplib公司 在Python中访问RabbitMQ。应用程序不时接收有关某些MQ主题的侦听请求。

    当它第一次收到这样的请求时,它会创建一个AMQP连接和一个通道,并启动一个新线程来监听消息:

        connection = amqp.Connection(host = host, userid = "guest", password = "guest", virtual_host = "/", insist = False)
        channel = connection.channel()
    
        listener = AMQPListener(channel)
        listener.start()
    

    AMQP侦听器 很简单:

    class AMQPListener(threading.Thread):
        def __init__(self, channel):
            threading.Thread.__init__(self)
            self.__channel = channel
    
        def run(self):
            while True:
                self.__channel.wait()
    

    创建连接后,它订阅感兴趣的主题,如下所示:

    channel.queue_declare(queue = queueName, exclusive = False)
    channel.exchange_declare(exchange = MQ_EXCHANGE_NAME, type = "direct", durable = False, auto_delete = True)
    channel.queue_bind(queue = queueName, exchange = MQ_EXCHANGE_NAME, routing_key = destination)
    
    def receive_callback(msg):
        self.queue.put(msg.body)
    
    channel.basic_consume(queue = queueName, no_ack = True, callback = receive_callback)
    

    第一次一切都很好。但是,在后续请求订阅另一个主题时失败。在随后的请求中,我重用AMQP连接和amqplister线程(因为我不想为每个主题启动新线程),并且当我调用 channel.queue_declare() 方法调用永远不会返回。我也试过在那个时候创建一个新的频道 连接通道() 电话也没回过。

    我能让它工作的唯一方法是为每个主题创建一个新的连接、通道和侦听线程(即路由密钥),但这真的不太理想。我怀疑是wait()方法以某种方式阻塞了整个连接,但我不知道该怎么办。当然,我应该能够使用一个侦听器线程接收具有多个路由键(甚至在多个通道上)的消息?

    一个相关的问题是:我该怎么做 停止 当不再感兴趣的话题时,侦听器线程?这个 频道。等待() 如果没有消息,呼叫将永远阻塞。我能想到的唯一方法是向队列发送一个会“毒害”它的伪消息,即被侦听器解释为停止的信号。

    1 回复  |  直到 15 年前
        1
  •  1
  •   Torsten    15 年前

    如果每个频道需要多个用户,请使用 基本消费() 使用 频道。等待() 之后。它将侦听通过 基本消费() . 确保为每个用户定义不同的消费者标签 基本消费() .

    使用 频道.基本取消(消费者标签) 如果要取消队列中的特定使用者(取消侦听特定主题)。