代码之家  ›  专栏  ›  技术社区  ›  Subhayan Bhattacharya

在Python中同时使用两个线程池

  •  0
  • Subhayan Bhattacharya  · 技术社区  · 5 年前

    我只是想了解一下Python中的ThreadPool,我试着想一个例子,其中一个线程池产生数据,而另一个线程池消耗数据。

    下面是我天真的尝试:

    from concurrent.futures import ThreadPoolExecutor
    from concurrent.futures import as_completed
    from time import sleep
    import threading
    import random
    
    final_results = {}
    
    def sample_function(n):
        threadname = threading.current_thread().name
        print(f"Starting execution of thread with the name {threadname} and argument {n}")
        sleep(random.randint(1, 5))
        print(f"About to finish executing for thread name {threadname}")
        return n ** 2
    
    def sample_function2(number, lock):
        threadname = threading.current_thread().name
        print(f"Starting execution of thread from 2nd pool with the name {threadname} and working on number {number}")
        with lock:
            final_results[number] = number * 2
        print(f"Completing execution of thread with the name {threadname}")
    
    
    pool2 = ThreadPoolExecutor(max_workers=3)
    numbers1 = [1, 2, 3, 4, 5, 6]
    with ThreadPoolExecutor(max_workers=3) as executor:
        lock = threading.Lock()
        futures = []
        for result in executor.map(sample_function, numbers1):
            future = pool2.submit(sample_function2, (result, lock))
            futures.append(future)
    
    for future in futures:
        while not future.done():
            sleep(2)
    
    print("Everything is done ... checking results now !")
    print(final_results)
    

    Starting execution of thread with the name ThreadPoolExecutor-1_0 and argument 1
    Starting execution of thread with the name ThreadPoolExecutor-1_1 and argument 2
    Starting execution of thread with the name ThreadPoolExecutor-1_2 and argument 3
    About to finish executing for thread name ThreadPoolExecutor-1_2
    About to finish executing for thread name ThreadPoolExecutor-1_1
    Starting execution of thread with the name ThreadPoolExecutor-1_2 and argument 4
    Starting execution of thread with the name ThreadPoolExecutor-1_1 and argument 5
    About to finish executing for thread name ThreadPoolExecutor-1_0
    Starting execution of thread with the name ThreadPoolExecutor-1_0 and argument 6
    About to finish executing for thread name ThreadPoolExecutor-1_1
    About to finish executing for thread name ThreadPoolExecutor-1_2
    About to finish executing for thread name ThreadPoolExecutor-1_0
    Everything is done ... checking results now !
    {}
    

    这是解决问题的正确方法,还是线程池使用某种队列在它们之间进行通信?

    但无论哪种情况,我都必须让第二个池正常工作。

    0 回复  |  直到 5 年前
        1
  •  0
  •   martineau    5 年前

    下面是对您的问题的至少部分回答,修复了第二个池未“启动”的问题。这有几个原因。

    其中两个是你没有打电话的 pool2.submit() 也不能从中获取结果(您需要附加从 使命感 future.result ).

    futures 名单上都是 None sample_function2() 不返回任何内容,则 while not future.done(): 不起作用,因为每个元素都是 done 方法调用,因此我删除了整个 for future in futures: 循环,这是不必要的,因为到那时,所有提交的线程都已完成,这是由于前一个循环的编写方式(即调用 submit() 然后马上打电话等它结束 result()

    Pool s没有做太多的并发处理,因此没有充分利用该功能。也就是说,我真的不明白为什么你觉得你必须拥有它们——所以很难知道建议你做什么。

    from concurrent.futures import ThreadPoolExecutor
    from time import sleep
    import threading
    import random
    
    final_results = {}
    
    def sample_function(n):
        threadname = threading.current_thread().name
        print(f"Starting execution of thread with the name {threadname} and argument {n}")
        sleep(random.randint(1, 5))
        print(f"About to finish executing for thread name {threadname}")
        return n ** 2
    
    def sample_function2(number, lock):
        threadname = threading.current_thread().name
        print(f"Starting execution of thread from 2nd pool with the name {threadname} and working on number {number}")
        with lock:
            final_results[number] = number * 2
        print(f"Completing execution of thread with the name {threadname}")
    
    
    numbers1 = [1, 2, 3, 4, 5, 6]
    
    pool2 = ThreadPoolExecutor(max_workers=3)
    with ThreadPoolExecutor(max_workers=3) as executor:
        lock = threading.Lock()
        futures = []
        for result in executor.map(sample_function, numbers1):
            future = pool2.submit(sample_function2, result, lock)  # Pass args like this.
            futures.append(future.result())  # Must call result(), but value always None.
    
    print("Everything is done ... checking results now!")
    print(final_results)
    

    Starting execution of thread with the name ThreadPoolExecutor-1_0 and argument 1
    Starting execution of thread with the name ThreadPoolExecutor-1_1 and argument 2
    Starting execution of thread with the name ThreadPoolExecutor-1_2 and argument 3
    About to finish executing for thread name ThreadPoolExecutor-1_1
    Starting execution of thread with the name ThreadPoolExecutor-1_1 and argument 4
    About to finish executing for thread name ThreadPoolExecutor-1_2
    Starting execution of thread with the name ThreadPoolExecutor-1_2 and argument 5
    About to finish executing for thread name ThreadPoolExecutor-1_0
    Starting execution of thread with the name ThreadPoolExecutor-1_0 and argument 6
    Starting execution of thread from 2nd pool with the name ThreadPoolExecutor-0_0 and working on number 1
    Completing execution of thread with the name ThreadPoolExecutor-0_0
    Starting execution of thread from 2nd pool with the name ThreadPoolExecutor-0_0 and working on number 4
    Completing execution of thread with the name ThreadPoolExecutor-0_0
    Starting execution of thread from 2nd pool with the name ThreadPoolExecutor-0_0 and working on number 9
    Completing execution of thread with the name ThreadPoolExecutor-0_0
    About to finish executing for thread name ThreadPoolExecutor-1_2
    About to finish executing for thread name ThreadPoolExecutor-1_1
    Starting execution of thread from 2nd pool with the name ThreadPoolExecutor-0_0 and working on number 16
    Completing execution of thread with the name ThreadPoolExecutor-0_0
    Starting execution of thread from 2nd pool with the name ThreadPoolExecutor-0_1 and working on number 25
    Completing execution of thread with the name ThreadPoolExecutor-0_1
    About to finish executing for thread name ThreadPoolExecutor-1_0
    Starting execution of thread from 2nd pool with the name ThreadPoolExecutor-0_0 and working on number 36
    Completing execution of thread with the name ThreadPoolExecutor-0_0
    Everything is done ... checking results now!
    {1: 2, 4: 8, 9: 18, 16: 32, 25: 50, 36: 72}