代码之家  ›  专栏  ›  技术社区  ›  sudonym

如何控制python的ThreadPoolExecutor的吞吐量速度?

  •  1
  • sudonym  · 技术社区  · 6 年前

    我正在使用python启动异步任务 concurrent.futures ThreadPoolExecutor 。 下列的 this 方法中,我使用 tqdm 进度条。

    我的代码如下所示 :

    with concurrent.futures.ThreadPoolExecutor(max_workers = n_jobs) as executor:
        future_to_url = {executor.submit(target_function, URL): URL for URL in URL_list}
        kwargs = {'total': len(future_to_url), # For tqdm
                'unit': 'URL',                 # For tqdm
                'unit_scale': True,            # For tqdm
                'leave': False,                # For tqdm
                'miniters': 50,                # For tqdm
                'desc': 'Scraping Progress'}
        for future in tqdm(concurrent.futures.as_completed(future_to_url), **kwargs):
                URL = future_to_url[future]
                try:
                    data = future.result()     # Concurrent calls
                except Exception as exc:
                    error_handling()           # Handle errors
                else:
                    result_handling()          # Handle non-errors
    

    控制台输出如下所示 :

    Scraping Progress:   9%|▉  | 3.35k/36.2k [08:18<1:21:22, 6.72URL/s] # I want < 6/s
    Scraping Progress:   9%|▉  | 3.40k/36.2k [08:26<1:21:16, 6.72URL/s] # I want < 6/s
    Scraping Progress:  10%|▉  | 3.45k/36.2k [08:30<1:20:40, 6.76URL/s] # I want < 6/s
    Scraping Progress:  10%|▉  | 3.50k/36.2k [08:40<1:20:51, 6.73URL/s] # I want < 6/s
    Scraping Progress:  10%|▉  | 3.55k/36.2k [08:46<1:20:36, 6.74URL/s] # I want < 6/s
    Scraping Progress:  10%|▉  | 3.60k/36.2k [08:52<1:20:17, 6.76URL/s] # I want < 6/s
    

    我知道我可以设置URL队列并控制其大小,如下所述 here

    然而,我不知道如何控制吞吐量速度本身。假设我希望不超过每秒6个URL。除了投入时间之外,还能通过其他方式存档吗。睡眠(n)至 target_function() 在上述示例中?

    如何有效控制 ThreadPoolExecutor 在python中 同时发生的期货 ?

    1 回复  |  直到 6 年前
        1
  •  2
  •   Hannu    6 年前

    简而言之,没有这样的方法。声明池后,如果不先关闭池并重新创建它,就无法更改工作线程的数量。也没有办法使池馈送任务的速度低于工人的最大速度。

    你有几个(不是那么理想的)选择。

    一种是向worker添加基于全局变量的睡眠。然后,您可以使用任务完成的回调来测量实际速度并相应地调整变量。但如果睡觉是不可能的,这就行不通了。

    更好的方法(尽管更加繁重)是自己编写任务管理器。在此版本中,您不使用池,而是编写一个管理工作进程的类。您生成了“足够多”的工作人员,这些工作人员会侦听任务队列。您将以所需的速度从经理处输入此队列。您将队列设置为具有非常低的最大大小,如果您的经理检测到队列已满,则会生成另一个工作线程。

    但是没有内置的功能来做您想要做的事情,这意味着需要做一些工作,或者您需要重新设计您的程序,这样您就不会一次性将所有任务提供给池,而是在那里进行一些限制。