你必须创建成对的列表
(filename, path)
data = [(img, img_dir) for img in images]
然后map将在分离过程中运行每一对。
但你必须得到
args
在里面
start_pipeline
def start_pipeline(self, args):
print('ok starting')
filename, path = args
print('filename: {}\npath: {}'.format(filename, path))
return self.process()
你必须使用
()
创建类的实例
segmenter_class
使用
start_pipeline
pool.map(segmenter_class().start_pipeline, data)
顺便说一句:在示例代码中,我也从进程返回结果。
import os
import multiprocessing
class SegmentationType(object):
DISPLAY_NAME = "invalid"
def __init__(self):
print('init')
def read_image(self):
print('read')
def write_image(self):
print('write')
def process(self):
pass
def start_pipeline(self, args):
print('ok starting')
filename, path = args
print('filename: {}\npath: {}'.format(filename, path))
return self.process()
class HSV_Segmenter(SegmentationType):
DISPLAY_NAME = 'HSV'
def process(self):
print('ok HSV')
return "result HSV"
class LabSegmenter(SegmentationType):
DISPLAY_NAME = 'LAB'
def process(self):
print('ok LAB')
return "result LAB"
if __name__ == '__main__':
procedure = 'hsv'
segmenter_class = {
'hsv': HSV_Segmenter,
'lab': LabSegmenter,
}.get(procedure)
images = ['01.png', '02.png', '03.png']
img_dir = 'C:/'
data = [(img, img_dir) for img in images]
pool = multiprocessing.Pool(3)
results = pool.map(segmenter_class().start_pipeline, data)
print('Results:', results)
for result in pool.map(segmenter_class().start_pipeline, data):
print('result:', result)
pool.terminate()
编辑:
您还可以创建以下函数
procedure
和
data
然后将其用于
map
-这样,每个进程都将创建自己的实例
程序
或者,您可以将不同的程序发送到不同的进程。
import os
import multiprocessing
class SegmentationType(object):
DISPLAY_NAME = "invalid"
def __init__(self):
print('init')
def read_image(self):
print('read')
def write_image(self):
print('write')
def process(self):
pass
def start_pipeline(self, args):
print('ok starting')
filename, path = args
print('filename: {}\npath: {}'.format(filename, path))
return self.process()
class HSV_Segmenter(SegmentationType):
DISPLAY_NAME = 'HSV'
def process(self):
print('ok HSV')
return "result HSV"
class LabSegmenter(SegmentationType):
DISPLAY_NAME = 'LAB'
def process(self):
print('ok LAB')
return "result LAB"
segmenters = {
'hsv': HSV_Segmenter,
'lab': LabSegmenter,
}
def start_process(args):
procedure = args[0]
data = args[1:]
segmenter_class = segmenters.get(procedure)
result = segmenter_class().start_pipeline(data)
return result
if __name__ == '__main__':
procedure = 'hsv'
images = ['01.png', '02.png', '03.png']
img_dir = 'C:/'
data = [(procedure, img, img_dir) for img in images]
pool = multiprocessing.Pool(3)
results = pool.map(start_process, data)
print('Results:', results)
for result in pool.map(segmenter_class().start_pipeline, data):
print('result:', result)
pool.terminate()
不同程序的示例
if __name__ == '__main__':
images = ['01.png', '02.png', '03.png']
img_dir = 'C:/'
pool = multiprocessing.Pool(3)
data = [('hsv', img, img_dir) for img in images]
results = pool.map(start_process, data)
print('Results HSV:', results)
data = [('lab', img, img_dir) for img in images]
results = pool.map(start_process, data)
print('Results LAB:', results)
pool.terminate()
一个也一样
map()
有6个进程要启动
Pool(3)
因此,它将同时只运行3个进程,并且当它有空闲进程时
地图
将从列表中获取下一个值并运行进程。
if __name__ == '__main__':
images = ['01.png', '02.png', '03.png']
img_dir = 'C:/'
data_hsv = [('hsv', img, img_dir) for img in images]
data_lab = [('lab', img, img_dir) for img in images]
data = data_hsv + data_lab
pool = multiprocessing.Pool(3)
results = pool.map(start_process, data)
print('Results:', results)
for result in pool.map(start_process, data):
print('results:', result)
pool.terminate()
编辑:
它也适用于
Ray
它只需要
from ray.util import multiprocessing
而不是
import multiprocessing
我没有用
Dask
,
PySpark
或
Joblib
编辑:
示例
Joblib
from joblib import Parallel, delayed
class SegmentationType(object):
DISPLAY_NAME = "invalid"
def __init__(self):
print('init')
def read_image(self):
print('read')
def write_image(self):
print('write')
def process(self):
pass
def start_pipeline(self, args):
print('ok starting')
filename, path = args
print('filename: {}\npath: {}'.format(filename, path))
return self.process()
class HSV_Segmenter(SegmentationType):
DISPLAY_NAME = 'HSV'
def process(self):
print('ok HSV')
return "result HSV"
class LabSegmenter(SegmentationType):
DISPLAY_NAME = 'LAB'
def process(self):
print('ok LAB')
return "result LAB"
segmenters = {
'hsv': HSV_Segmenter,
'lab': LabSegmenter,
}
def start_process(args):
procedure = args[0]
data = args[1:]
segmenter_class = segmenters.get(procedure)
result = segmenter_class().start_pipeline(data)
return result
if __name__ == '__main__':
images = ['01.png', '02.png', '03.png']
img_dir = 'C:/'
data_hsv = [('hsv', img, img_dir) for img in images]
data_lab = [('lab', img, img_dir) for img in images]
data = data_hsv + data_lab
pool = Parallel(n_jobs=3)
results = pool( delayed(start_process)(args) for args in data )
print('Results:', results)
for result in pool( delayed(start_process)(args) for args in data ):
print('result:', result)
with Parallel(n_jobs=3) as pool:
results = pool( delayed(start_process)(args) for args in data )
print('Results:', results)
for result in pool( delayed(start_process)(args) for args in data ):
print('result:', result)