让我们接受您的想法,并在此功能中进行拆分:
-
造物主
应创建文件(例如100个)
-
重新定位器
应一次移动1个文件,直到没有更多文件可移动
-
创建者可以在
重新定位器
所以它也可以
把自己变成一个
重新定位器
双方都必须知道什么时候
完成
因此,我们有两个主要功能:
def create(i):
# creates files and return outpath
return os.path.join("some/path/based/on/stuff", "{}.ext".format(i))
def relocate(from, to):
# moves created files
shuttil.move(from, to)
现在让我们创建我们的流程:
from multiprocessing import Process, Queue
comm_queue = Queue()
#process that create the files and push the data into the queue
def creator(comm_q):
for i in range(100):
comm_q.put(create(i))
comm_q.put("STOP_FLAG") # we tell the workers when to stop, we just push one since we only have one more worker
#the relocator works till it gets an stop flag
def relocator(comm_q):
data = comm_q.get()
while data != "STOP_FLAG":
if data:
relocate(data, to_path_you_may_want)
data = comm_q.get()
creator_process= multiprocessing.Process(target=creator, args=(comm_queue))
relocators = multiprocessing.Process(target=relocator, args=(comm_queue))
creator_process.start()
relocators .start()
这样我们现在就有了一个创建者和一个重定位器,但是,让我们说现在我们想要
造物主
要在创建作业完成后开始重新定位,我们可以使用重新定位器,但我们需要再推一个
"STOP_FLAG"
因为我们将有2个流程重新定位
def creator(comm_q):
for i in range(100):
comm_q.put(create(i))
for _ in range(2):
comm_q.put("STOP_FLAG")
relocator(comm_q)
假设我们现在需要任意数量的重定位程序进程,我们应该稍微调整代码来处理这个问题,我们需要
creator
方法来了解通知其他进程何时停止的标志数量,我们的结果代码如下所示:
from multiprocessing import Process, Queue, cpu_count
comm_queue = Queue()
#process that create the files and push the data into the queue
def creator(comm_q, number_of_subprocesses):
for i in range(100):
comm_q.put(create(i))
for _ in range(number_of_subprocesses + 1): # we need to count ourselves
comm_q.put("STOP_FLAG")
relocator(comm_q)
#the relocator works till it gets an stop flag
def relocator(comm_q):
data = comm_q.get()
while data != "STOP_FLAG":
if data:
relocate(data, to_path_you_may_want)
data = comm_q.get()
num_of_cpus = cpu_count() #we will spam as many processes as cpu core we have
creator_process= Process(target=creator, args=(comm_queue, num_of_cpus))
relocators = [Process(target=relocator, args=(comm_queue)) for _ in num_of_cpus]
creator_process.start()
for rp in relocators:
rp.start()
那么你必须
等待
让他们完成:
creator_process.join()
for rp in relocators:
rp.join()
您可以在
multiprocessing.Queue
documentation
特别是
get
method
(默认情况下是阻塞调用)
从队列中删除并返回项目。如果可选参数块为
True(默认值),超时值为None(默认值),如果
直到有可用的项目为止。