代码之家  ›  专栏  ›  技术社区  ›  iam.Carrot

RabbitMQ队列Pika的多处理

  •  0
  • iam.Carrot  · 技术社区  · 7 年前

    我正在与 RabbitMQ 队列。我想通过一个程序运行多个使用者对象实例。下面是我的 Operator 创建 1 producer and 1 consumer

    class Operator(object):
    
        def __init__(self, delegate: callable, identifier):
            """
            Create a new instance of the Operator and initialize the connections
            """
            self._queue_details = self._get_queue_details()
            self._host_ip = self._queue_details['IP']
            self._port = self._queue_details['Port']
            self._username = self._queue_details['Username']
            self._password = self._queue_details['Password']
            self._input_queue_name = self._queue_details['ReadQueueName']
            self._output_queue_name = self._queue_details['WriteQueueName']
            self._error_queue_name = self._queue_details['ErrorQueueName']
            self._delegate = delegate
            self._identifier = identifier
            self._queue_connection = None
            self._input_channel = None
            self._output_channel = None
            self._error_channel = None
            self.is_busy = False
            self.mark_to_terminate = False
    
        def __del__(self):
            # close connections
            self._queue_connection.close()
    
        @staticmethod
        def _initialize_channel(connection, queue_name, durable):
            channel = connection.channel()
            channel.queue_declare(queue=queue_name, durable=durable)
            return channel
    
        @staticmethod
        def _get_queue_details() -> dict:
            return ConfigurationManager().get_value('queueDetails')
    
        @staticmethod
        def _get_connection(username, password, host_ip, port):
            connection = pika.BlockingConnection(pika.ConnectionParameters(
                credentials=pika.PlainCredentials(username, password), host=host_ip, port=port))
            return connection
    
        def initialize_operator(self):
            connection = self._get_connection(self._username, self._password, self._host_ip, self._port)
            self._queue_connection = connection
            self._input_channel = self._initialize_channel(connection, self._input_queue_name, durable=False)
            self._output_channel = self._initialize_channel(connection, self._output_queue_name, durable= True)
            self._error_channel = self._initialize_channel(connection, self._error_queue_name, durable=True)
    
        def consume(self):
            self._input_channel.basic_qos(prefetch_count=1)
            self._input_channel.basic_consume(self._process_incoming_message, queue=self._input_queue_name)
            self._input_channel.start_consuming()
    
        def _push_to_queue(self, channel, response):
            channel.basic_publish(exchange='', routing_key=self._output_queue_name, body=response,
                                    properties=pika.BasicProperties(delivery_mode=2))  # make message persistent
    
        def _process_incoming_message(self, channel, method, properties, message):
            self.is_busy = True
            processed_result, is_error = self._delegate(message)
    
            if is_error:
                self._error_channel.basic_publish(exchange='', routing_key=self._output_queue_name, body=processed_result,
                                                    properties=pika.BasicProperties(delivery_mode=2))
            else:
                self._output_channel.basic_publish(exchange='', routing_key=self._output_queue_name, body=processed_result,
                                                    properties=pika.BasicProperties(delivery_mode=2))
    
            # send in the final ack of the process.
            channel.basic_ack(delivery_tag=method.delivery_tag)
    
            # close connection if to avoid receiving messages
            if self.mark_to_terminate:
                self._queue_connection.close()
    
            self.is_busy = False
    

    在我的主要剧本中,我对代理进行了如下调整:

    # spins up the agent
    for count in range(spin_up_count):
        instance = Operator(self._translate_and_parse, f'Operator: {time.time()}')
        instance.initialize_operator()
        process = Process(target=instance.consume)
        process.start()
        self._online_agents.append((instance, process))
    

    问题是当我 process.start() 这让我 TypeError

    TypeError:无法pickle \u线程。锁定对象

    完成堆栈跟踪

    File "C:/Users/adity/Documents/PythonProjects/Caligo/Caligo/QueueService.py", line 201, in _scale_up
    process.start()
    File "C:\Users\adity\AppData\Local\Programs\Python\Python36-32\lib\multiprocessing\process.py", line 105, in start
    self._popen = self._Popen(self)
    File "C:\Users\adity\AppData\Local\Programs\Python\Python36-32\lib\multiprocessing\context.py", line 223, in _Popen
    return _default_context.get_context().Process._Popen(process_obj)
    File "C:\Users\adity\AppData\Local\Programs\Python\Python36-32\lib\multiprocessing\context.py", line 322, in _Popen
    return Popen(process_obj)
    File "C:\Users\adity\AppData\Local\Programs\Python\Python36-32\lib\multiprocessing\popen_spawn_win32.py", line 65, in __init__
    reduction.dump(process_obj, to_child)
    File "C:\Users\adity\AppData\Local\Programs\Python\Python36-32\lib\multiprocessing\reduction.py", line 60, in dump
    ForkingPickler(file, protocol).dump(obj)
    TypeError: can't pickle _thread.lock objects
    
    1 回复  |  直到 7 年前
        1
  •  0
  •   Luke Bakken    7 年前

    RabbitMQ团队监控 the rabbitmq-users mailing list 有时只回答有关StackOverflow的问题。


    不要实例化 Operator 启动分叉进程之前的对象。你也不能 instance.consume 分叉过程的目标。

    这个 target 的方法 Process 实例应该创建 操作人员 实例,然后调用 consume 方法

    如果需要管理分叉的进程,则应跟踪进程ID并使用信号与它们通信。