代码之家  ›  专栏  ›  技术社区  ›  Bogdan Boamfa

芹菜Gevent池-ConcurrentObjectUseError

  •  6
  • Bogdan Boamfa  · 技术社区  · 6 年前

    我有个芹菜工人在用 gevent 执行HTTP请求并添加另一个具有页面源的芹菜任务的池。

    我使用Django、RabbitMQ作为代理、Redis作为芹菜结果后端、芹菜4.1.0。

    任务已 ignore_result=True 但我经常犯这个错误 ConcurrentObjectUseError: This socket is already used by another greenlet: <bound method Waiter.switch of <gevent.hub.Waiter...>

    我知道这与Redis连接有关。

    我想不出怎么解决这个问题。 这或多或少就是任务的逻辑。 我还试着在呼叫时使用信号灯 process_task.apply_async 但它没有起作用。

    from gevent.lock import BoundedSemaphore
    
    sem = BoundedSemaphore(1)
    
    
    @app.task(ignore_result=True, queue='request_queue')
    def request_task(url, *args, **kwargs):
        # make the request
        req = requests.get(url)
    
        request = {
            'status_code': req.status_code,
            'content': req.text,
            'headers': dict(req.headers),
            'encoding': req.encoding
        }
        with sem:
            process_task.apply_async(kwargs={'url': url, 'request': request})
        print(f'Done - {url}')
    

    这是堆栈跟踪:

    cancel_wait_ex: [Errno 9] File descriptor was closed in another greenlet
      File "redis/connection.py", line 543, in send_packed_command
        self._sock.sendall(item)
      File "gevent/_socket3.py", line 424, in sendall
        data_sent += self.send(data_memory[data_sent:], flags, timeout=timeleft)
      File "gevent/_socket3.py", line 394, in send
        self._wait(self._write_event)
      File "gevent/_socket3.py", line 156, in _wait
        self.hub.wait(watcher)
      File "gevent/hub.py", line 651, in wait
        result = waiter.get()
      File "gevent/hub.py", line 898, in get
        return self.hub.switch()
      File "gevent/hub.py", line 630, in switch
        return RawGreenlet.switch(self)
    ConnectionError: Error 9 while writing to socket. File descriptor was closed in another greenlet.
      File "redis/client.py", line 2165, in _execute
        return command(*args)
      File "redis/connection.py", line 563, in send_command
        self.send_packed_command(self.pack_command(*args))
      File "redis/connection.py", line 556, in send_packed_command
        (errno, errmsg))
    OSError: [Errno 9] Bad file descriptor
      File "redis/connection.py", line 126, in _read_from_socket
        data = self._sock.recv(socket_read_size)
      File "gevent/_socket3.py", line 332, in recv
        return _socket.socket.recv(self._sock, *args)
    ConnectionError: Error while reading from socket: (9, 'Bad file descriptor')
      File "redis/client.py", line 2165, in _execute
        return command(*args)
      File "redis/connection.py", line 563, in send_command
        self.send_packed_command(self.pack_command(*args))
      File "redis/connection.py", line 538, in send_packed_command
        self.connect()
      File "redis/connection.py", line 446, in connect
        self.on_connect()
      File "redis/connection.py", line 520, in on_connect
        if nativestr(self.read_response()) != 'OK':
      File "redis/connection.py", line 577, in read_response
        response = self._parser.read_response()
      File "redis/connection.py", line 238, in read_response
        response = self._buffer.readline()
      File "redis/connection.py", line 168, in readline
        self._read_from_socket()
      File "redis/connection.py", line 143, in _read_from_socket
        (e.args,))
    BlockingIOError: [Errno 11] Resource temporarily unavailable
      File "gevent/_socket3.py", line 390, in send
        return _socket.socket.send(self._sock, data, flags)
    OSError: [Errno 9] Bad file descriptor
      File "redis/connection.py", line 543, in send_packed_command
        self._sock.sendall(item)
      File "gevent/_socket3.py", line 424, in sendall
        data_sent += self.send(data_memory[data_sent:], flags, timeout=timeleft)
      File "gevent/_socket3.py", line 396, in send
        return _socket.socket.send(self._sock, data, flags)
    ConnectionError: Error 9 while writing to socket. Bad file descriptor.
      File "redis/client.py", line 2165, in _execute
        return command(*args)
      File "redis/connection.py", line 563, in send_command
        self.send_packed_command(self.pack_command(*args))
      File "redis/connection.py", line 556, in send_packed_command
        (errno, errmsg))
    BlockingIOError: [Errno 11] Resource temporarily unavailable
      File "gevent/_socket3.py", line 390, in send
        return _socket.socket.send(self._sock, data, flags)
    OSError: [Errno 9] Bad file descriptor
      File "redis/connection.py", line 543, in send_packed_command
        self._sock.sendall(item)
      File "gevent/_socket3.py", line 424, in sendall
        data_sent += self.send(data_memory[data_sent:], flags, timeout=timeleft)
      File "gevent/_socket3.py", line 396, in send
        return _socket.socket.send(self._sock, data, flags)
    ConnectionError: Error 9 while writing to socket. Bad file descriptor.
      File "redis/client.py", line 2165, in _execute
        return command(*args)
      File "redis/connection.py", line 563, in send_command
        self.send_packed_command(self.pack_command(*args))
      File "redis/connection.py", line 556, in send_packed_command
        (errno, errmsg))
    BlockingIOError: [Errno 11] Resource temporarily unavailable
      File "gevent/_socket3.py", line 390, in send
        return _socket.socket.send(self._sock, data, flags)
    ConcurrentObjectUseError: This socket is already used by another greenlet: <bound method Waiter.switch of <gevent.hub.Waiter object at 0x7f271b53dea0>>
      File "celery/app/trace.py", line 374, in trace_task
        R = retval = fun(*args, **kwargs)
      File "celery/app/trace.py", line 629, in __protected_call__
        return self.run(*args, **kwargs)
      File "drones/tasks.py", line 330, in blue_drone_request_task
        blue_drone_process_task.apply_async(kwargs={'targetpage': targetpage, 'request': request})
      File "celery/app/task.py", line 536, in apply_async
        **options
      File "celery/app/base.py", line 736, in send_task
        self.backend.on_task_call(P, task_id)
      File "celery/backends/redis.py", line 189, in on_task_call
        self.result_consumer.consume_from(task_id)
      File "celery/backends/redis.py", line 76, in consume_from
        self._consume_from(task_id)
      File "celery/backends/redis.py", line 82, in _consume_from
        self._pubsub.subscribe(key)
      File "redis/client.py", line 2229, in subscribe
        ret_val = self.execute_command('SUBSCRIBE', *iterkeys(new_channels))
      File "redis/client.py", line 2161, in execute_command
        self._execute(connection, connection.send_command, *args)
      File "redis/client.py", line 2165, in _execute
        return command(*args)
      File "redis/connection.py", line 563, in send_command
        self.send_packed_command(self.pack_command(*args))
      File "redis/connection.py", line 538, in send_packed_command
        self.connect()
      File "redis/connection.py", line 455, in connect
        callback(self)
      File "redis/client.py", line 2120, in on_connect
        self.subscribe(**channels)
      File "redis/client.py", line 2229, in subscribe
        ret_val = self.execute_command('SUBSCRIBE', *iterkeys(new_channels))
      File "redis/client.py", line 2161, in execute_command
        self._execute(connection, connection.send_command, *args)
      File "redis/client.py", line 2165, in _execute
        return command(*args)
      File "redis/connection.py", line 563, in send_command
        self.send_packed_command(self.pack_command(*args))
      File "redis/connection.py", line 538, in send_packed_command
        self.connect()
      File "redis/connection.py", line 455, in connect
        callback(self)
      File "redis/client.py", line 2120, in on_connect
        self.subscribe(**channels)
      File "redis/client.py", line 2229, in subscribe
        ret_val = self.execute_command('SUBSCRIBE', *iterkeys(new_channels))
      File "redis/client.py", line 2161, in execute_command
        self._execute(connection, connection.send_command, *args)
      File "redis/client.py", line 2172, in _execute
        connection.connect()
      File "redis/connection.py", line 455, in connect
        callback(self)
      File "redis/client.py", line 2120, in on_connect
        self.subscribe(**channels)
      File "redis/client.py", line 2229, in subscribe
        ret_val = self.execute_command('SUBSCRIBE', *iterkeys(new_channels))
      File "redis/client.py", line 2161, in execute_command
        self._execute(connection, connection.send_command, *args)
      File "redis/client.py", line 2172, in _execute
        connection.connect()
      File "redis/connection.py", line 455, in connect
        callback(self)
      File "redis/client.py", line 2120, in on_connect
        self.subscribe(**channels)
      File "redis/client.py", line 2229, in subscribe
        ret_val = self.execute_command('SUBSCRIBE', *iterkeys(new_channels))
      File "redis/client.py", line 2161, in execute_command
        self._execute(connection, connection.send_command, *args)
      File "redis/client.py", line 2172, in _execute
        connection.connect()
      File "redis/connection.py", line 455, in connect
        callback(self)
      File "redis/client.py", line 2120, in on_connect
        self.subscribe(**channels)
      File "redis/client.py", line 2229, in subscribe
        ret_val = self.execute_command('SUBSCRIBE', *iterkeys(new_channels))
      File "redis/client.py", line 2161, in execute_command
        self._execute(connection, connection.send_command, *args)
      File "redis/client.py", line 2165, in _execute
        return command(*args)
      File "redis/connection.py", line 563, in send_command
        self.send_packed_command(self.pack_command(*args))
      File "redis/connection.py", line 538, in send_packed_command
        self.connect()
      File "redis/connection.py", line 455, in connect
        callback(self)
      File "redis/client.py", line 2120, in on_connect
        self.subscribe(**channels)
      File "redis/client.py", line 2229, in subscribe
        ret_val = self.execute_command('SUBSCRIBE', *iterkeys(new_channels))
      File "redis/client.py", line 2161, in execute_command
        self._execute(connection, connection.send_command, *args)
      File "redis/client.py", line 2165, in _execute
        return command(*args)
      File "redis/connection.py", line 563, in send_command
        self.send_packed_command(self.pack_command(*args))
      File "redis/connection.py", line 538, in send_packed_command
        self.connect()
      File "redis/connection.py", line 455, in connect
        callback(self)
      File "redis/client.py", line 2120, in on_connect
        self.subscribe(**channels)
      File "redis/client.py", line 2229, in subscribe
        ret_val = self.execute_command('SUBSCRIBE', *iterkeys(new_channels))
      File "redis/client.py", line 2161, in execute_command
        self._execute(connection, connection.send_command, *args)
      File "redis/client.py", line 2172, in _execute
        connection.connect()
      File "redis/connection.py", line 455, in connect
        callback(self)
      File "redis/client.py", line 2120, in on_connect
        self.subscribe(**channels)
      File "redis/client.py", line 2229, in subscribe
        ret_val = self.execute_command('SUBSCRIBE', *iterkeys(new_channels))
      File "redis/client.py", line 2161, in execute_command
        self._execute(connection, connection.send_command, *args)
      File "redis/client.py", line 2172, in _execute
        connection.connect()
      File "redis/connection.py", line 455, in connect
        callback(self)
      File "redis/client.py", line 2120, in on_connect
        self.subscribe(**channels)
      File "redis/client.py", line 2229, in subscribe
        ret_val = self.execute_command('SUBSCRIBE', *iterkeys(new_channels))
      File "redis/client.py", line 2161, in execute_command
        self._execute(connection, connection.send_command, *args)
      File "redis/client.py", line 2165, in _execute
        return command(*args)
      File "redis/connection.py", line 563, in send_command
        self.send_packed_command(self.pack_command(*args))
      File "redis/connection.py", line 538, in send_packed_command
        self.connect()
      File "redis/connection.py", line 455, in connect
        callback(self)
      File "redis/client.py", line 2120, in on_connect
        self.subscribe(**channels)
      File "redis/client.py", line 2229, in subscribe
        ret_val = self.execute_command('SUBSCRIBE', *iterkeys(new_channels))
      File "redis/client.py", line 2161, in execute_command
        self._execute(connection, connection.send_command, *args)
      File "redis/client.py", line 2172, in _execute
        connection.connect()
      File "redis/connection.py", line 455, in connect
        callback(self)
      File "redis/client.py", line 2120, in on_connect
        self.subscribe(**channels)
      File "redis/client.py", line 2229, in subscribe
        ret_val = self.execute_command('SUBSCRIBE', *iterkeys(new_channels))
      File "redis/client.py", line 2161, in execute_command
        self._execute(connection, connection.send_command, *args)
      File "redis/client.py", line 2165, in _execute
        return command(*args)
      File "redis/connection.py", line 563, in send_command
        self.send_packed_command(self.pack_command(*args))
      File "redis/connection.py", line 538, in send_packed_command
        self.connect()
      File "redis/connection.py", line 455, in connect
        callback(self)
      File "redis/client.py", line 2120, in on_connect
        self.subscribe(**channels)
      File "redis/client.py", line 2229, in subscribe
        ret_val = self.execute_command('SUBSCRIBE', *iterkeys(new_channels))
      File "redis/client.py", line 2161, in execute_command
        self._execute(connection, connection.send_command, *args)
      File "redis/client.py", line 2172, in _execute
        connection.connect()
      File "redis/connection.py", line 455, in connect
        callback(self)
      File "redis/client.py", line 2120, in on_connect
        self.subscribe(**channels)
      File "redis/client.py", line 2229, in subscribe
        ret_val = self.execute_command('SUBSCRIBE', *iterkeys(new_channels))
      File "redis/client.py", line 2161, in execute_command
        self._execute(connection, connection.send_command, *args)
      File "redis/client.py", line 2172, in _execute
        connection.connect()
      File "redis/connection.py", line 455, in connect
        callback(self)
      File "redis/client.py", line 2120, in on_connect
        self.subscribe(**channels)
      File "redis/client.py", line 2229, in subscribe
        ret_val = self.execute_command('SUBSCRIBE', *iterkeys(new_channels))
      File "redis/client.py", line 2161, in execute_command
        self._execute(connection, connection.send_command, *args)
      File "redis/client.py", line 2165, in _execute
        return command(*args)
      File "redis/connection.py", line 563, in send_command
        self.send_packed_command(self.pack_command(*args))
      File "redis/connection.py", line 538, in send_packed_command
        self.connect()
      File "redis/connection.py", line 455, in connect
        callback(self)
      File "redis/client.py", line 2120, in on_connect
        self.subscribe(**channels)
      File "redis/client.py", line 2229, in subscribe
        ret_val = self.execute_command('SUBSCRIBE', *iterkeys(new_channels))
      File "redis/client.py", line 2161, in execute_command
        self._execute(connection, connection.send_command, *args)
      File "redis/client.py", line 2165, in _execute
        return command(*args)
      File "redis/connection.py", line 563, in send_command
        self.send_packed_command(self.pack_command(*args))
      File "redis/connection.py", line 543, in send_packed_command
        self._sock.sendall(item)
      File "gevent/_socket3.py", line 424, in sendall
        data_sent += self.send(data_memory[data_sent:], flags, timeout=timeleft)
      File "gevent/_socket3.py", line 394, in send
        self._wait(self._write_event)
      File "gevent/_socket3.py", line 150, in _wait
        raise _socketcommon.ConcurrentObjectUseError('This socket is already used by another greenlet: %r' % (watcher.callback, ))
    
    1 回复  |  直到 6 年前
        1
  •  4
  •   Bogdan Boamfa    6 年前

    我不确定这是否是正确的答案,但通过将CELERY\u RESULT\u BACKEND设置为None,我不再看到此错误。

    我希望这有帮助。

    仅供参考,此错误仅在使用gevent池时发生。我尝试过的芹菜版本:3.1、4.2.1。