代码之家  ›  专栏  ›  技术社区  ›  sberry

python:从线程化子进程的stdout读取非阻塞

  •  0
  • sberry  · 技术社区  · 14 年前

    我有一个脚本(worker.py),它以表单的形式打印未缓冲的输出…

    1
    2
    3
    .
    .
    .
    n
    

    其中n是这个脚本中的循环将要进行的一些常量迭代次数。在另一个脚本(service_controller.py)中,我启动了多个线程,每个线程使用subprocess.popen(stdout=subprocess.pipe,…)启动子进程;现在,在我的主线程(service_controller.py)中,我想读取每个线程的worker.py子进程的输出,并使用它来计算完成前剩余时间的估计值。

    我有所有的逻辑工作,从worker.py读取stdout并确定最后一个打印的数字。问题是我不知道如何以一种非阻塞的方式来实现这一点。如果我读取一个常量bufsize,那么每次读取都会等待来自每个工人的相同数据。我尝试过很多方法,包括使用fcntl、select+os.read等。我在这里的最佳选择是什么?如果需要的话,我可以发布我的源代码,但是我认为解释足够好地描述了这个问题。

    谢谢你的帮助。

    编辑
    添加示例代码

    我有一个启动子流程的工人。

    class WorkerThread(threading.Thread):
        def __init__(self):
            self.completed = 0
            self.process = None
            self.lock = threading.RLock()
            threading.Thread.__init__(self)
    
        def run(self):
            cmd = ["/path/to/script", "arg1", "arg2"]
            self.process = subprocess.Popen(cmd, stdout=subprocess.PIPE, bufsize=1, shell=False)
            #flags = fcntl.fcntl(self.process.stdout, fcntl.F_GETFL)
            #fcntl.fcntl(self.process.stdout.fileno(), fcntl.F_SETFL, flags | os.O_NONBLOCK)
    
        def get_completed(self):
            self.lock.acquire();
            fd = select.select([self.process.stdout.fileno()], [], [], 5)[0]
            if fd:
                self.data += os.read(fd, 1)
                try:
                    self.completed = int(self.data.split("\n")[-2])
                except IndexError:
                    pass
            self.lock.release()
            return self.completed
    

    然后我有了一个三管理员。

    class ThreadManager():
        def __init__(self):
            self.pool = []
            self.running = []
            self.lock = threading.Lock()
    
        def clean_pool(self, pool):
            for worker in [x for x in pool is not x.isAlive()]:
                worker.join()
                pool.remove(worker)
                del worker
            return pool
    
        def run(self, concurrent=5):
            while len(self.running) + len(self.pool) > 0:
                self.clean_pool(self.running)
                n = min(max(concurrent - len(self.running), 0), len(self.pool))
                if n > 0:
                    for worker in self.pool[0:n]:
                        worker.start()
                    self.running.extend(self.pool[0:n])
                    del self.pool[0:n]
                time.sleep(.01)
             for worker in self.running + self.pool:
                 worker.join()
    

    以及一些运行它的代码。

    threadManager = ThreadManager()
    for i in xrange(0, 5):
        threadManager.pool.append(WorkerThread())
    threadManager.run()
    

    我已经删除了其他代码的日志,希望能找出问题所在。

    2 回复  |  直到 13 年前
        1
  •  2
  •   dweeves    14 年前

    没有让您的服务控制器被I/O访问阻塞,只有线程循环应该读取自己控制的进程输出。

    然后,可以让线程对象中的方法控制进程,以获取最后一次轮询的输出。

    当然,在这种情况下,不要忘记使用某种锁定机制来保护线程将用来填充缓冲区的缓冲区,以及由控制器调用以获取缓冲区的方法。

        2
  •  1
  •   Jeff Younker    13 年前

    方法workerthread.run()启动子进程,然后立即终止。run()需要执行轮询并更新workerthread.completed,直到子进程完成。