代码之家  ›  专栏  ›  技术社区  ›  Anirban B

rabbitmq中用于多队列的多消费者

  •  2
  • Anirban B  · 技术社区  · 7 年前

    我有两个队列,比如q1和q2,对应于具有绑定键b1和b2的e1和e2交换。我想并行运行消费函数,比如c1和c2,它们将分别侦听q1和q2。我尝试了以下方法:

    def c1():
        connection = pika.BlockingConnection(pika.ConnectionParameters(host=constants.rmqHostIp))
        channel = connection.channel()
        channel.exchange_declare(exchange='e1', durable='true',
                             type='topic')
        result = channel.queue_declare(durable='false', queue='q1')
        queue_name = result.method.queue
        binding_key = "b1"
        channel.queue_bind(exchange='e1',
                           queue=queue_name,
                           routing_key=binding_key)
        channel.basic_consume(callback,queue=queue_name,no_ack=False)
        channel.start_consuming()
    
    def c2():
        connection = pika.BlockingConnection(pika.ConnectionParameters(host=constants.rmqHostIp))
        channel = connection.channel()
        channel.exchange_declare(exchange='e2', durable='true',
                             type='topic')
        result = channel.queue_declare(durable='false', queue='q2')
        queue_name = result.method.queue
        binding_key = "b2"
        channel.queue_bind(exchange=e1,
                           queue=queue_name,
                           routing_key=binding_key)
        channel.basic_consume(callback,queue=queue_name,no_ack=False)
        channel.start_consuming()
    
    if __name__ == '__main__':
        c1()
        c2()
    

    然而,它只监听c1函数和c2函数,并没有得到执行。如何运行这两个函数? 提前谢谢。

    编辑:我在两个不同的模块(文件)中有方法c1和c1

    1 回复  |  直到 7 年前
        1
  •  6
  •   wolfoorin    7 年前

    为了同时运行这两个函数,需要一些多线程方法。请看一看 here 对于一些python示例。

    这是使用Process类修改的代码。它还可以使用线程或从操作系统显式运行它。

    import pika
    from multiprocessing import Process
    
    
    def callback():
        print 'callback got data'
    
    
    class c1():
        def __init__(self):
            self.connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
            self.channel = self.connection.channel()
            self.channel.exchange_declare(exchange='e1', durable='true', type='topic')
            result = self.channel.queue_declare(durable='false', queue='q1')
            queue_name = result.method.queue
            binding_key = "b1"
            self.channel.queue_bind(exchange='e1', queue=queue_name, routing_key=binding_key)
            self.channel.basic_consume(callback,queue=queue_name,no_ack=False)
    
        def run(self):
            self.channel.start_consuming()
    
    
    class c2():
        def __init__(self):
            self.connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
            self.channel = self.connection.channel()
            self.channel.exchange_declare(exchange='e2', durable='true', type='topic')
            result = self.channel.queue_declare(durable='false', queue='q2')
            queue_name = result.method.queue
            binding_key = "b2"
            self.channel.queue_bind(exchange='e1', queue=queue_name, routing_key=binding_key)
    
            self.channel.basic_consume(callback,queue=queue_name,no_ack=False)
    
        def run(self):
            self.channel.start_consuming()
    
    if __name__ == '__main__':
        subscriber_list = []
        subscriber_list.append(c1())
        subscriber_list.append(c2())
    
        # execute
        process_list = []
        for sub in subscriber_list:
            process = Process(target=sub.run)
            process.start()
            process_list.append(process)
    
        # wait for all process to finish
        for process in process_list:
            process.join()