代码之家  ›  专栏  ›  技术社区  ›  honeymoon

面向对象的图像多处理

  •  0
  • honeymoon  · 技术社区  · 4 年前

    我想用我的类使用多处理并行分析几个图像:

    class SegmentationType(object):
        DISPLAY_NAME = "invalid"
    
        def __init__(self, filename, path):
            self.filename = filename
            self.path = path
            self.input_data = None
            self.output_data = None
    
    
        def read_image(self):
            self.input_data =  cv2.imread(self.path + self.filename)[1]
    
        def write_image(self):
            cv2.imwrite(self.path + self.filename.split('.')[0] + '_' + self.DISPLAY_NAME + '.png', self.output_data)
    
        def process(self):
            # override in derived classes to perform an actual segmentation
            pass
    
        def start_pipeline(self):
            self.read_image()
            self.process()
            self.write_image()
    
    class HSV_Segmenter(SegmentationType):
        DISPLAY_NAME = 'HSV'
    
        def process(self):
            source = rgb_to_hsv(self.input_data)
            self.output_data = treshold_otsu(source)
    
    
    class LabSegmenter(SegmentationType):
        DISPLAY_NAME = 'LAB'
    
        def process(self):
            source = rgb_to_lab(self.input_data)
            self.output_data = global_threshold(source)
    
    
    segmenter_class = {
    'hsv': HSV_Segmentation,
    'lab': LAB_Segmenter
    }.get(procedure)
    
    if not segmenter_class:
        raise ArgumentError("Invalid segmentation method '{}'".format(procedure))
    
    for img in images:
        os.chdir(img_dir)
        processor =  = segmenter_class(img, img_dir, procedure)
        processor.start_pipeline()
    

    但是,我不知道如何调用map函数:

    image_lst = os.listdir(my_image_path)
    
    # We split the list into sublist with 5 elements because of 512 GB RAM limitation
    if len(image_lst) > 4:
        nr_of_sublists = int(len(image_lst)/2.5)
        image_sub_lst  =np.array_split(image_lst, nr_of_sublists)
    else:
        image_sub_lst = [image_lst]
    
    # We do the analysis for each sublist
    for sub_lst in image_sub_lst:
        print (sub_lst)
        pool = multiprocessing.Pool(8)
        
        # Call the processor 
        processor =  = segmenter_class(img, img_dir, procedure)
        processor.start_pipeline()
        # How to call map???
        pool.map(?, sub_lst)
        pool.terminate()
        
    

    编辑:

    我试图将代码更改为注释,但仍然出现错误:

    import os
    import multiprocessing
    
    class SegmentationType(object):
        DISPLAY_NAME = "invalid"
    
        def __init__(self):
    
            print ('init')
    
        def read_image(self):
            print ('read')
    
        def write_image(self):
            print ('write')
    
        def process(self):
            # override in derived classes to perform an actual segmentation
            pass
    
        def start_pipeline(self, args):
            print ('ok starting')
            filename, path = args
            print(filename, path)
            self.process()
    
    class HSV_Segmenter(SegmentationType):
        DISPLAY_NAME = 'HSV'
    
        def process(self):
            print ('ok HSV')
    
    class LabSegmenter(SegmentationType):
        DISPLAY_NAME = 'LAB'
    
        def process(self):
            print ('ok LAB')
    
    procedure = 'hsv'
    segmenter_class = {
    'hsv': HSV_Segmenter,
    'lab': LabSegmenter
    }.get(procedure)
    
    images = ['01.png', '02.png', '03.png']
    img_dir = 'C:/'
    
    if __name__ == '__main__':
        pool = multiprocessing.Pool(3)
        pool.map(segmenter_class.start_pipeline, [images, img_dir])
        pool.terminate()
    

    错误: 上述异常是以下异常的直接原因:

    追溯(最后一次通话): 文件“C:\Users/lueck/PycharmProjects/hymale_cmd/hymale_cmd/multi.py”,第50行,in pool.map(segmenter_class.start_pipeline,[图像,img_dir]) 文件“C:\Users\lueck\AppData\Local\Continuum\anaconda3\envs\hymale_env\lib\multiprocessing\pool.py”,第266行,在map中 回归自我_map_async(函数、可迭代、mapstar、chunksize).get() get中的文件“C:\Users\lueck\AppData\Local\Continuum\anaconda3\envs\hymale_env\lib\multiprocessing\pool.py”,第644行 提升自我_价值 类型错误:start_pipeline()缺少1个必需的位置参数:“args”

    0 回复  |  直到 4 年前
        1
  •  2
  •   furas    3 年前

    你必须创建成对的列表 (filename, path)

    data = [(img, img_dir) for img in images]
    

    然后map将在分离过程中运行每一对。

    但你必须得到 args 在里面 start_pipeline

        def start_pipeline(self, args):
            print('ok starting')
            
            filename, path = args
            print('filename: {}\npath: {}'.format(filename, path))
            
            return self.process()
    

    你必须使用 () 创建类的实例 segmenter_class 使用 start_pipeline

    pool.map(segmenter_class().start_pipeline, data)
    

    顺便说一句:在示例代码中,我也从进程返回结果。


    import os
    import multiprocessing
    
    class SegmentationType(object):
        DISPLAY_NAME = "invalid"
    
        def __init__(self):
            print('init')
    
        def read_image(self):
            print('read')
    
        def write_image(self):
            print('write')
    
        def process(self):
            # override in derived classes to perform an actual segmentation
            pass
    
        def start_pipeline(self, args):
            print('ok starting')
            
            filename, path = args
            print('filename: {}\npath: {}'.format(filename, path))
            
            return self.process()
    
    class HSV_Segmenter(SegmentationType):
        DISPLAY_NAME = 'HSV'
    
        def process(self):
            print('ok HSV')
            return "result HSV"
        
    class LabSegmenter(SegmentationType):
        DISPLAY_NAME = 'LAB'
    
        def process(self):
            print('ok LAB')
            return "result LAB"
    
    if __name__ == '__main__':
    
        procedure = 'hsv'
        
        segmenter_class = {
            'hsv': HSV_Segmenter,
            'lab': LabSegmenter,
        }.get(procedure)
        
        images = ['01.png', '02.png', '03.png']
        img_dir = 'C:/'
        
        data = [(img, img_dir) for img in images]
        
        pool = multiprocessing.Pool(3)
    
        # example 1
    
        results = pool.map(segmenter_class().start_pipeline, data)
        print('Results:', results)
    
        # example 2
    
        for result in pool.map(segmenter_class().start_pipeline, data):
            print('result:', result)
    
        pool.terminate()
    

    编辑:

    您还可以创建以下函数 procedure data 然后将其用于 map -这样,每个进程都将创建自己的实例 程序 或者,您可以将不同的程序发送到不同的进程。

    import os
    import multiprocessing
    
    class SegmentationType(object):
        DISPLAY_NAME = "invalid"
    
        def __init__(self):
            print('init')
    
        def read_image(self):
            print('read')
    
        def write_image(self):
            print('write')
    
        def process(self):
            # override in derived classes to perform an actual segmentation
            pass
    
        def start_pipeline(self, args):
            print('ok starting')
            
            filename, path = args
            print('filename: {}\npath: {}'.format(filename, path))
            
            return self.process()
    
    class HSV_Segmenter(SegmentationType):
        DISPLAY_NAME = 'HSV'
    
        def process(self):
            print('ok HSV')
            return "result HSV"
        
    class LabSegmenter(SegmentationType):
        DISPLAY_NAME = 'LAB'
    
        def process(self):
            print('ok LAB')
            return "result LAB"
    
    segmenters = {
        'hsv': HSV_Segmenter,
        'lab': LabSegmenter,
    }
    
    def start_process(args):
    
        procedure = args[0]
        data = args[1:]
    
        segmenter_class = segmenters.get(procedure)
        result = segmenter_class().start_pipeline(data)
    
        return result
        
    if __name__ == '__main__':
    
        procedure = 'hsv'
        
        images = ['01.png', '02.png', '03.png']
        img_dir = 'C:/'
        
        data = [(procedure, img, img_dir) for img in images]
        
        pool = multiprocessing.Pool(3)
    
        # example 1
    
        results = pool.map(start_process, data)
        print('Results:', results)
    
        # example 2
    
        for result in pool.map(segmenter_class().start_pipeline, data):
            print('result:', result)
    
        pool.terminate()
    

    不同程序的示例

    if __name__ == '__main__':
    
        images = ['01.png', '02.png', '03.png']
        img_dir = 'C:/'
        
        pool = multiprocessing.Pool(3)
    
        data = [('hsv', img, img_dir) for img in images]
        results = pool.map(start_process, data)
        print('Results HSV:', results)
    
        data = [('lab', img, img_dir) for img in images]
        results = pool.map(start_process, data)
        print('Results LAB:', results)
    
        pool.terminate()
    

    一个也一样 map() 有6个进程要启动 Pool(3) 因此,它将同时只运行3个进程,并且当它有空闲进程时 地图 将从列表中获取下一个值并运行进程。

    if __name__ == '__main__':
    
        images = ['01.png', '02.png', '03.png']
        img_dir = 'C:/'
        
        data_hsv = [('hsv', img, img_dir) for img in images]
        data_lab = [('lab', img, img_dir) for img in images]
        
        data = data_hsv + data_lab
    
        pool = multiprocessing.Pool(3)
    
        # example 1
    
        results = pool.map(start_process, data)
        print('Results:', results)
    
        # example 2
    
        for result in pool.map(start_process, data):
            print('results:', result)
    
        pool.terminate()
    

    编辑:

    它也适用于 Ray

    它只需要

    from ray.util import multiprocessing
    

    而不是

    import multiprocessing
    

    我没有用 Dask , PySpark Joblib


    编辑:

    示例 Joblib

    from joblib import Parallel, delayed
    
    class SegmentationType(object):
        DISPLAY_NAME = "invalid"
    
        def __init__(self):
            print('init')
    
        def read_image(self):
            print('read')
    
        def write_image(self):
            print('write')
    
        def process(self):
            # override in derived classes to perform an actual segmentation
            pass
    
        def start_pipeline(self, args):
            print('ok starting')
            
            filename, path = args
            print('filename: {}\npath: {}'.format(filename, path))
            
            return self.process()
    
    class HSV_Segmenter(SegmentationType):
        DISPLAY_NAME = 'HSV'
    
        def process(self):
            print('ok HSV')
            return "result HSV"
    
    class LabSegmenter(SegmentationType):
        DISPLAY_NAME = 'LAB'
    
        def process(self):
            print('ok LAB')
            return "result LAB"
    
    segmenters = {
        'hsv': HSV_Segmenter,
        'lab': LabSegmenter,
    }
    
    def start_process(args):
        
        procedure = args[0]
        data = args[1:]
        
        segmenter_class = segmenters.get(procedure)
        result = segmenter_class().start_pipeline(data)
        
        return result
    
    if __name__ == '__main__':
    
        images = ['01.png', '02.png', '03.png']
        img_dir = 'C:/'
        
        data_hsv = [('hsv', img, img_dir) for img in images]
        data_lab = [('lab', img, img_dir) for img in images]
        
        data = data_hsv + data_lab
    
        # --- version 1 ---
    
        #pool = Parallel(n_jobs=3, backend='threading')
        #pool = Parallel(n_jobs=3, backend='multiprocessing')
        pool = Parallel(n_jobs=3)
        
        # example 1
        
        results = pool( delayed(start_process)(args) for args in data )
        print('Results:', results)
    
        # example 2
        
        for result in pool( delayed(start_process)(args) for args in data ):
            print('result:', result)
    
        # --- version 2 ---
        
        #with Parallel(n_jobs=3, backend='threading') as pool:
        #with Parallel(n_jobs=3, backend='multiprocessing') as pool:
        with Parallel(n_jobs=3) as pool:
    
            # example 1
            
            results = pool( delayed(start_process)(args) for args in data )
            print('Results:', results)
    
            # example 1
    
            for result in pool( delayed(start_process)(args) for args in data ):
                print('result:', result)