代码之家  ›  专栏  ›  技术社区  ›  Angel_M

Python并发。futures使用子流程,运行多个python脚本

  •  2
  • Angel_M  · 技术社区  · 7 年前

    我想同时使用concurrent运行几个python脚本。期货 我的代码的串行版本在文件夹中查找特定的python文件并执行它。

    import re
    import os
    import glob
    import re
    from glob import glob
    import concurrent.futures as cf
    
    FileList = [];
    import time
    FileList = [];
    start_dir = os.getcwd();
    pattern   = "Read.py"
    
    for dir,_,_ in os.walk(start_dir):
        FileList.extend(glob(os.path.join(dir,pattern))) ;
    
    FileList
    
    i=0
    for file in FileList:
        dir=os.path.dirname((file))
        dirname1 = os.path.basename(dir) 
        print(dirname1)
        i=i+1
        Str='python '+ file
        print(Str)
        completed_process = subprocess.run(Str)`
    

    对于我的代码的并行版本:

        def Python_callback(future):
        print(future.run_type, future.jid)
        return "One Folder finished executing"
    
    def Python_execute():
        from concurrent.futures import ProcessPoolExecutor as Pool
        args = FileList
        pool = Pool(max_workers=1)
        future = pool.submit(subprocess.call, args, shell=1)
        future.run_type = "run_type"
        future.jid = FileList
        future.add_done_callback(Python_callback)
        print("Python executed")
    
    if __name__ == '__main__':
        import subprocess
        Python_execute()
    

    问题是我不知道如何将文件列表的每个元素传递给单独的cpu

    提前感谢您的帮助

    1 回复  |  直到 7 年前
        1
  •  3
  •   abarnert    7 年前

    最小的变化是使用 submit 每个元素一次,而不是整个列表一次:

    futures = []
    for file in FileList:
        future = pool.submit(subprocess.call, file, shell=1)
        future.blah blah
        futures.append(future)
    

    这个 futures 只有当您想对期货做一些事情时,列表才是必要的——等待它们完成,检查它们的返回值,等等。

    同时,您显式地使用 max_workers=1 。毫不奇怪,这意味着您将只获得1个工作子进程,因此它最终将等待一个子进程完成,然后再获取下一个子进程。如果要同时运行它们,请删除 max_workers 并将其默认为每个核心一个(或通过 max_workers=8 或者其他号码 1 ,如果您有充分的理由覆盖默认设置)。


    虽然我们在做这件事,但有很多方法可以简化您的工作:

    • 你真的需要 multiprocessing 在这里如果您需要与每个子流程进行通信,那么在单个线程中进行通信可能会很痛苦,但是线程,或者 asyncio ,将与此处的流程一样有效。
    • 更重要的是,看起来您实际上并不需要任何东西,只需要启动流程并等待它完成,这可以通过简单的同步代码来完成。
    • 为什么要构建字符串并使用 shell=1 而不是仅仅传递一个列表而不使用shell?不必要地使用shell会产生开销、安全问题和调试麻烦。
    • 你真的不需要 jid 在每一个将来,它只是所有调用字符串的列表,这是没有用的。可能更有用的是某种标识符,或者子流程返回代码,或者可能还有很多其他的东西,但它们都可以通过读取 subprocess.call 或者一个简单的包装器。
    • 您也不需要回拨。如果你把所有的未来都收集在一个列表中 as_completed 这样,您就可以更简单地打印结果。
    • 如果你同时做这两件事,你就只剩下一个 pool.submit 在循环内部–这意味着您可以将整个循环替换为 pool.map
    • 您很少需要或想要混合 os.walk glob 。当您实际有一个全局模式时,应用 fnmatch 超过 files 列表发件人 操作系统。步行 。但在这里,您只需在每个目录中查找一个特定的文件名,所以实际上,您需要过滤的只是 file == 'Read.py'
    • 您没有使用 i 在您的循环中。但如果你真的需要,最好 for i, file in enumerate(FileList): 而不是去做 for file in FileList: 并手动增加