代码之家  ›  专栏  ›  技术社区  ›  sberry

Python:使用线程多次调用subprocess.Popen

  •  -1
  • sberry  · 技术社区  · 16 年前

    我已经在其他几个应用程序中使用了这个设置,在这些应用程序中,我希望在我的类中运行带有代码的函数。但是,当我在每个线程调用的函数中使用subprocess.Popen调用时,调用会一次运行一次,而不是像我预期的那样并发运行。

    class ProcService(jsonrpc.JSONRPC):
            self.thread_pool = []
            self.running_threads = []
            self.lock = threading.Lock()
    
            def clean_pool(self, thread_pool, join=False):
                    for th in [x for x in thread_pool if not x.isAlive()]:
                            if join: th.join()
                            thread_pool.remove(th)
                            del th
                    return thread_pool
    
            def run_threads(self, parallel=10):
                    while len(self.running_threads)+len(self.thread_pool) > 0:
                            self.clean_pool(self.running_threads, join=True)
                            n = min(max(parallel - len(self.running_threads), 0), len(self.thread_pool))
                            if n > 0:
                                    for th in self.thread_pool[0:n]: th.start()
                                    self.running_threads.extend(self.thread_pool[0:n])
                                    del self.thread_pool[0:n]
                            time.sleep(.01)
                    for th in self.running_threads+self.thread_pool: th.join()
    
            def jsonrpc_run_procs(self):
                    for i, item in enumerate(self.items):
                            if item.should_run():
                                    self.thread_pool.append(threading.Thread(target=self.run_proc, args=tuple([item])))
                    self.run_threads(5)
    
            def run_proc(self, proc):
                    self.lock.acquire()
                    print "\nSubprocess started"
                    p = subprocess.Popen('%s/program_to_run.py %s' %(os.getcwd(), proc.data), shell=True, stdin=subprocess.PIPE, stdout=subprocess.PIPE,)
                    stdout_value = proc.communicate('through stdin to stdout')[0]
                    self.lock.release()
    

    欢迎提供任何帮助/建议。

    def运行程序(自身,程序): self.lock.acquire() self.running_procs.append([p,proc.data.id])

    在我调用self.run_线程(5)之后,我调用self.check_procs()

    check_procs方法迭代正在运行的_procs的列表,以检查poll()是否为None。如何从管道中获取输出?我试过以下两种方法

    calling check_procs once:
    
    def check_procs(self):
        for proc_details in self.running_procs:
            proc = proc_details[0]
            while (proc.poll() == None):
                time.sleep(0.1)
            stdout_value = proc.communicate('through stdin to stdout')[0]
            self.running_procs.remove(proc_details)
            print proc_details[1], stdout_value
            del proc_details
    

    calling check_procs in while loop like:
    
    while len(self.running_procs) > 0:
        self.check_procs()
    
    def check_procs(self):
        for proc_details in self.running_procs:
            if (proc.poll() is not None):
                stdout_value = proc.communicate('through stdin to stdout')[0]
                self.running_procs.remove(proc_details)
                print proc_details[1], stdout_value
                del proc_details
    
    2 回复  |  直到 16 年前
        1
  •  1
  •   ACoolie    16 年前

    您的具体问题可能是由线路引起的 stdout_value = proc.communicate('through stdin to stdout')[0] . Subprocess.communicate将 "Wait for process to terminate" ,与锁一起使用时,将一次运行一个。

    您只需添加 p 变量,然后运行并使用子流程API等待子流程完成。定期轮询主线程中的每个子进程。

    再看一眼,这一行似乎也有问题: for th in self.running_threads+self.thread_pool: th.join() . join()是另一个等待线程完成的方法。

        2
  •  1
  •   Alex Martelli    16 年前

    我认为关键代码是:

        self.lock.acquire()
        print "\nSubprocess started"
        p = subprocess.Popen( # etc
        stdout_value = proc.communicate('through stdin to stdout')[0]
        self.lock.release()
    

    对acquire和release的显式调用应该保证序列化——如果您在这个块中执行其他操作而不是子流程使用,您不也会一成不变地观察序列化吗?

    编辑 stdout_value Queue.Queue() get (或 get_nowait 等)一旦他们准备好并被 put 那里一般来说 Queue 是Python中安排线程通信(通常也是同步)的最佳方式,任何时候都可以用这种方式安排。

    特别是:添加 import Queue self.lock (删除这三行即可);添加 self.q = Queue.Queue() __init__ ; 电话刚打完 stdout_value = proc.communicate(... 添加一条语句 self.q.put(stdout_value) ; 现在完成这项工作 jsonrpc_run_procs 方法

    while not self.q.empty():
      result = self.q.get()
      print 'One result is %r' % result
    

    确认所有结果都存在。(通常情况下 empty 队列的方法是不可靠的,但在这种情况下,放入队列的所有线程都已完成,所以您应该没事)。