代码之家  ›  专栏  ›  技术社区  ›  Martin

线程池类似于多处理池?

  •  292
  • Martin  · 技术社区  · 14 年前

    工人有游泳池吗 螺纹 类似于多处理模块 Pool class ?

    例如,我喜欢将映射函数并行化的简单方法

    def long_running_func(p):
        c_func_no_gil(p)
    
    p = multiprocessing.Pool(4)
    xs = p.map(long_running_func, range(100))
    

    但是,我希望在不增加创建新流程的开销的情况下完成这项工作。

    我知道吉尔的事。然而,在我的用例中,函数将是一个IO绑定的C函数,对于该函数,Python包装器将在实际函数调用之前释放gil。

    我必须编写自己的线程池吗?

    8 回复  |  直到 6 年前
        1
  •  383
  •   Martin    14 年前

    我刚发现实际上 中基于线程的池接口 multiprocessing 但是,模块是隐藏的,没有适当的文档记录。

    它可以通过

    from multiprocessing.pool import ThreadPool
    

    它是使用包装Python线程的虚拟进程类实现的。这个基于线程的进程类可以在 multiprocessing.dummy 这在 docs . 这个虚拟模块假定提供了基于线程的整个多处理接口。

        2
  •  163
  •   asmeurer    10 年前

    在python 3中,您可以使用 concurrent.futures.ThreadPoolExecutor ,即:

    executor = ThreadPoolExecutor(max_workers=10)
    a = executor.submit(my_function)
    

    docs 更多信息和示例。

        3
  •  50
  •   martineau    7 年前

    是的,它似乎有(或多或少)相同的API。

    import multiprocessing
    
    def worker(lnk):
        ....    
    def start_process():
        .....
    ....
    
    if(PROCESS):
        pool = multiprocessing.Pool(processes=POOL_SIZE, initializer=start_process)
    else:
        pool = multiprocessing.pool.ThreadPool(processes=POOL_SIZE, 
                                               initializer=start_process)
    
    pool.map(worker, inputs)
    ....
    
        4
  •  36
  •   Jonathan Lonowski    7 年前

    对于非常简单和轻便的东西(稍微修改 here ):

    from Queue import Queue
    from threading import Thread
    
    
    class Worker(Thread):
        """Thread executing tasks from a given tasks queue"""
        def __init__(self, tasks):
            Thread.__init__(self)
            self.tasks = tasks
            self.daemon = True
            self.start()
    
        def run(self):
            while True:
                func, args, kargs = self.tasks.get()
                try:
                    func(*args, **kargs)
                except Exception, e:
                    print e
                finally:
                    self.tasks.task_done()
    
    
    class ThreadPool:
        """Pool of threads consuming tasks from a queue"""
        def __init__(self, num_threads):
            self.tasks = Queue(num_threads)
            for _ in range(num_threads):
                Worker(self.tasks)
    
        def add_task(self, func, *args, **kargs):
            """Add a task to the queue"""
            self.tasks.put((func, args, kargs))
    
        def wait_completion(self):
            """Wait for completion of all the tasks in the queue"""
            self.tasks.join()
    
    if __name__ == '__main__':
        from random import randrange
        from time import sleep
    
        delays = [randrange(1, 10) for i in range(100)]
    
        def wait_delay(d):
            print 'sleeping for (%d)sec' % d
            sleep(d)
    
        pool = ThreadPool(20)
    
        for i, d in enumerate(delays):
            pool.add_task(wait_delay, d)
    
        pool.wait_completion()
    

    要支持任务完成时的回调,您只需将回调添加到任务元组。

        5
  •  7
  •   Dimitri Mestdagh    7 年前

    嗨,要在python中使用线程池,可以使用此库:

    from multiprocessing.dummy import Pool as ThreadPool
    

    然后这个库就可以使用了:

    pool = ThreadPool(threads)
    results = pool.map(service, tasks)
    pool.close()
    pool.join()
    return results
    

    线程是您想要的线程数,任务是最映射到服务的任务列表。

        6
  •  3
  •   forumulator    6 年前

    这是我最终使用的结果。这是上面dgorissen修改的类版本。

    文件: threadpool.py

    from queue import Queue, Empty
    import threading
    from threading import Thread
    
    
    class Worker(Thread):
        _TIMEOUT = 2
        """ Thread executing tasks from a given tasks queue. Thread is signalable, 
            to exit
        """
        def __init__(self, tasks, th_num):
            Thread.__init__(self)
            self.tasks = tasks
            self.daemon, self.th_num = True, th_num
            self.done = threading.Event()
            self.start()
    
        def run(self):       
            while not self.done.is_set():
                try:
                    func, args, kwargs = self.tasks.get(block=True,
                                                       timeout=self._TIMEOUT)
                    try:
                        func(*args, **kwargs)
                    except Exception as e:
                        print(e)
                    finally:
                        self.tasks.task_done()
                except Empty as e:
                    pass
            return
    
        def signal_exit(self):
            """ Signal to thread to exit """
            self.done.set()
    
    
    class ThreadPool:
        """Pool of threads consuming tasks from a queue"""
        def __init__(self, num_threads, tasks=[]):
            self.tasks = Queue(num_threads)
            self.workers = []
            self.done = False
            self._init_workers(num_threads)
            for task in tasks:
                self.tasks.put(task)
    
        def _init_workers(self, num_threads):
            for i in range(num_threads):
                self.workers.append(Worker(self.tasks, i))
    
        def add_task(self, func, *args, **kwargs):
            """Add a task to the queue"""
            self.tasks.put((func, args, kwargs))
    
        def _close_all_threads(self):
            """ Signal all threads to exit and lose the references to them """
            for workr in self.workers:
                workr.signal_exit()
            self.workers = []
    
        def wait_completion(self):
            """Wait for completion of all the tasks in the queue"""
            self.tasks.join()
    
        def __del__(self):
            self._close_all_threads()
    
    
    def create_task(func, *args, **kwargs):
        return (func, args, kwargs)
    

    使用池

    from random import randrange
    from time import sleep
    
    delays = [randrange(1, 10) for i in range(30)]
    
    def wait_delay(d):
        print('sleeping for (%d)sec' % d)
        sleep(d)
    
    pool = ThreadPool(20)
    for i, d in enumerate(delays):
        pool.add_task(wait_delay, d)
    pool.wait_completion()
    
        7
  •  2
  •   unbeli    14 年前

    创建新进程的开销是最小的,特别是当它只有4个时。我怀疑这是你申请的一个性能热点。保持简单,优化您需要的位置和分析结果指向的位置。

        8
  •  1
  •   crizCraig    9 年前

    没有内置的基于线程的池。但是,使用 Queue 班级。

    来自: https://docs.python.org/2/library/queue.html

    from threading import Thread
    from Queue import Queue
    def worker():
        while True:
            item = q.get()
            do_work(item)
            q.task_done()
    
    q = Queue()
    for i in range(num_worker_threads):
         t = Thread(target=worker)
         t.daemon = True
         t.start()
    
    for item in source():
        q.put(item)
    
    q.join()       # block until all tasks are done