代码之家  ›  专栏  ›  技术社区  ›  Ma0

正在与多处理队列斗争

  •  2
  • Ma0  · 技术社区  · 7 年前

    我的结构(大规模简化)如下所示:

    import multiprocessing
    
    def creator():
        # creates files
        return
    
    
    def relocator():
        # moves created files
        return
    
    
    create = multiprocessing.Process(target=creator)
    relocate = multiprocessing.Process(target=relocator)
    create.start()
    relocate.start()
    

    我想做的是创建一组文件 creator 一旦 它们被创建并移动到另一个目录 relocator .

    我想使用的原因 multiprocessing 以下是:

    • 我不想 造物主 等待搬家先结束,因为搬家需要时间,我不想浪费时间。
    • 也不能在开始复制之前先创建所有文件,因为驱动器中没有足够的空间容纳所有文件。

    我想要两个 造物主 重新定位器 进程是串行的(每次一个文件),但并行运行。操作的“日志”应如下所示:

    # creating file 1
    # creating file 2 and relocating file 1
    # creating file 3 and relocating file 2
    # ...
    # relocating last file
    

    根据我所读到的, Queue 就是去这里的路。

    策略: (也许不是最好的?!)

    创建文件后,它将进入队列,在完成重新定位后,它将从队列中删除。

    然而,我对它的编码有问题;同时创建多个文件(的多个实例 造物主 并行运行)和其他。。。

    如有任何想法、提示、解释等,我将不胜感激

    1 回复  |  直到 7 年前
        1
  •  1
  •   Netwave    7 年前

    让我们接受您的想法,并在此功能中进行拆分:

    1. 造物主 应创建文件(例如100个)

    2. 重新定位器 应一次移动1个文件,直到没有更多文件可移动

    3. 创建者可以在 重新定位器 所以它也可以 把自己变成一个 重新定位器 双方都必须知道什么时候 完成

    因此,我们有两个主要功能:

    def create(i):
        # creates files and return outpath
        return os.path.join("some/path/based/on/stuff", "{}.ext".format(i))
    
    
    def relocate(from, to):
        # moves created files
        shuttil.move(from, to)
    

    现在让我们创建我们的流程:

    from multiprocessing import Process, Queue
    
    comm_queue = Queue()
    
    #process that create the files and push the data into the queue
    def creator(comm_q):
        for i in range(100):
            comm_q.put(create(i))
        comm_q.put("STOP_FLAG") # we tell the workers when to stop, we just push one since we only have one more worker
    
    #the relocator works till it gets an stop flag
    def relocator(comm_q):
        data = comm_q.get()
        while data != "STOP_FLAG":
            if data:
                relocate(data, to_path_you_may_want)
            data = comm_q.get()
    
    creator_process= multiprocessing.Process(target=creator, args=(comm_queue))
    relocators = multiprocessing.Process(target=relocator, args=(comm_queue))
    creator_process.start()
    relocators .start()
    

    这样我们现在就有了一个创建者和一个重定位器,但是,让我们说现在我们想要 造物主 要在创建作业完成后开始重新定位,我们可以使用重新定位器,但我们需要再推一个 "STOP_FLAG" 因为我们将有2个流程重新定位

    def creator(comm_q):
        for i in range(100):
            comm_q.put(create(i))
        for _ in range(2):
            comm_q.put("STOP_FLAG")
        relocator(comm_q)
    

    假设我们现在需要任意数量的重定位程序进程,我们应该稍微调整代码来处理这个问题,我们需要 creator 方法来了解通知其他进程何时停止的标志数量,我们的结果代码如下所示:

    from multiprocessing import Process, Queue, cpu_count
    
    comm_queue = Queue()
    
    #process that create the files and push the data into the queue
    def creator(comm_q, number_of_subprocesses):
        for i in range(100):
            comm_q.put(create(i))
        for _ in range(number_of_subprocesses + 1): # we need to count ourselves
            comm_q.put("STOP_FLAG")
        relocator(comm_q)
    
    #the relocator works till it gets an stop flag
    def relocator(comm_q):
        data = comm_q.get()
        while data != "STOP_FLAG":
            if data:
                relocate(data, to_path_you_may_want)
            data = comm_q.get()
    
    num_of_cpus = cpu_count() #we will spam as many processes as cpu core we have
    creator_process= Process(target=creator, args=(comm_queue, num_of_cpus))
    relocators = [Process(target=relocator, args=(comm_queue)) for _ in num_of_cpus]
    creator_process.start()
    for rp in relocators:
        rp.start()
    

    那么你必须 等待 让他们完成:

    creator_process.join()
    for rp in relocators:
        rp.join()
    

    您可以在 multiprocessing.Queue documentation

    特别是 get method (默认情况下是阻塞调用)

    从队列中删除并返回项目。如果可选参数块为 True(默认值),超时值为None(默认值),如果 直到有可用的项目为止。