代码之家  ›  专栏  ›  技术社区  ›  nonagon

工作进程刷新时的python多处理池通知

  •  0
  • nonagon  · 技术社区  · 5 年前

    multiprocessing.Pool 管理一个由3名工人组成的人才库。每个worker都相当复杂,并且在某些第三方代码中存在资源泄漏(可能),在连续运行6-8小时后会导致问题。所以我想用 maxtasksperchild 让工人定期更新。

    我也希望每个工人写自己的单独的日志文件。没有 最大任务存档 我使用共享的 multiprocessing.Value 要为每个辅助进程分配一个整数(0、1或2),请使用该整数命名日志文件。

    最大任务存档 我想重用日志文件一旦一个工人完成。因此,如果整个过程运行一个月,我只需要三个日志文件,而不是每个生成的worker都需要一个日志文件。

    如果我能回电(例如 finalizer 随波逐流 initializer 目前支持),这将是直接的。没有这一点,我就看不到一个健壮而简单的方法。

    0 回复  |  直到 5 年前
        1
  •  1
  •   Darkonaut    5 年前

    那是没有文件的,但是 multiprocessing 有一个 Finalizer initializer .

    我不明白 multiprocessing.Value 不过,在这种情况下,这是一个有用的同步选择。多个worker可以同时退出,这意味着哪个文件整数是空闲的,而不是一个(锁定的)计数器所能提供的。

    我建议使用多个裸体 multiprocessing.Lock s、 每个文件一个,而不是:

    from multiprocessing import Pool, Lock, current_process
    from multiprocessing.util import Finalize
    
    
    def f(n):
        global fileno
        for _ in range(int(n)):  # xrange for Python 2
            pass
        return fileno
    
    
    def init_fileno(file_locks):
        for i, lock in enumerate(file_locks):
            if lock.acquire(False):  # non-blocking attempt
                globals()['fileno'] = i
                print("{} using fileno: {}".format(current_process().name, i))
                Finalize(lock, lock.release, exitpriority=15)
                break
    
    
    if __name__ == '__main__':
    
        n_proc = 3
        file_locks = [Lock() for _ in range(n_proc)]
    
        pool = Pool(
            n_proc, initializer=init_fileno, initargs=(file_locks,),
            maxtasksperchild=2
        )
    
        print(pool.map(func=f, iterable=[50e6] * 18))
        pool.close()
        pool.join()
        # all locks should be available if all finalizers did run
        assert all(lock.acquire(False) for lock in file_locks)
    

    ForkPoolWorker-1 using fileno: 0
    ForkPoolWorker-2 using fileno: 1
    ForkPoolWorker-3 using fileno: 2
    ForkPoolWorker-4 using fileno: 0
    ForkPoolWorker-5 using fileno: 1
    ForkPoolWorker-6 using fileno: 2
    [0, 0, 1, 1, 2, 2, 0, 0, 1, 1, 2, 2, 0, 0, 1, 1, 2, 2]
    
    Process finished with exit code 0
    

    请注意,对于python3,您无法可靠地使用Pool的上下文管理器,而不是上面所示的旧方法。池的上下文管理器(很不幸)调用 terminate() ,这可能会杀死工作进程 之前 终结器有机会运行。

        2
  •  0
  •   nonagon    5 年前

    我最终选择了以下的方式。它假设pid不会很快被回收(在Ubuntu上是这样,但在Unix上一般不会)。我不认为它有任何其他的假设,但我真的只是对Ubuntu感兴趣,所以我没有仔细研究Windows等其他平台。

    def run_pool():
        child_pids = Array('i', 3)
        pool = Pool(3, initializser=init_worker, initargs=(child_pids,), maxtasksperchild=1000)
    
    def init_worker(child_pids):
        with child_pids.get_lock():
            available_index = None
            for index, pid in enumerate(child_pids):
                # PID 0 means unallocated (this happens when our pool is started), we reclaim PIDs
                # which are no longer in use. We also reclaim the lucky case where a PID was recycled
                # but assigned to one of our workers again, so we know we can take it over
                if not pid or not _is_pid_in_use(pid) or pid == os.getpid():
                    available_index = index
                    break
    
            if available_index is not None:
                child_pids[available_index] = os.getpid()
            else:
                # This is unexpected - it means all of the PIDs are in use so we have a logical error
                # or a PID was recycled before we could notice and reclaim its index
                pass
    
    def _is_pid_in_use(pid):
        try:
            os.kill(pid, 0)
            return True
        except OSError:
            return False