代码之家  ›  专栏  ›  技术社区  ›  Srikar Appalaraju Tonetel

python->多处理模块

  •  6
  • Srikar Appalaraju Tonetel  · 技术社区  · 14 年前

    1. 我有大约一百万个文件需要解析&将解析后的内容附加到一个文件中。
    2. 在Python中不使用线程,因为它本质上是运行单个进程(由于GIL)。
    3. 因此使用多处理模块。i、 e.产生4个子进程来利用所有原始核心能量:)

    到目前为止还不错,现在我需要一个所有子进程都可以访问的共享对象。我正在使用多处理模块中的队列。此外,所有子进程都需要将其输出写入单个文件。我想这是一个使用锁的好地方。当我运行这个设置时,我没有得到任何错误(因此父进程看起来很好),它只是暂停。当我按下ctrl-C时,我看到一个回溯(每个子进程一个)。也没有输出写入输出文件。下面是代码(请注意,没有多个进程,一切都可以正常运行)-

    import os
    import glob
    from multiprocessing import Process, Queue, Pool
    
    data_file  = open('out.txt', 'w+')
    
    def worker(task_queue):
        for file in iter(task_queue.get, 'STOP'):
            data = mine_imdb_page(os.path.join(DATA_DIR, file))
            if data:
                data_file.write(repr(data)+'\n')
        return
    
    def main():
        task_queue = Queue()
        for file in glob.glob('*.csv'):
            task_queue.put(file)
        task_queue.put('STOP') # so that worker processes know when to stop
    
        # this is the block of code that needs correction.
        if multi_process:
            # One way to spawn 4 processes
            # pool = Pool(processes=4) #Start worker processes
            # res  = pool.apply_async(worker, [task_queue, data_file])
    
            # But I chose to do it like this for now.
            for i in range(4):
                proc = Process(target=worker, args=[task_queue])
                proc.start()
        else: # single process mode is working fine!
            worker(task_queue)
        data_file.close()
        return
    

    我做错什么了?我还尝试在生成时将openfileu对象传递给每个进程。但毫无效果。e、 g- Process(target=worker, args=[task_queue, data_file]) . 但这并没有改变什么。我觉得子进程由于某种原因无法写入文件。或者 file_object

    额外费用: 还有什么方法可以保持一个持久的mysql\u连接处于打开状态并将其传递给sub\u进程?因此,我在父进程中打开了一个mysql连接&打开的连接应该可以被我的所有子进程访问。基本上,这相当于python中的共享内存。有什么想法吗?

    2 回复  |  直到 14 年前
        1
  •  4
  •   Srikar Appalaraju Tonetel    14 年前

    虽然与埃里克的讨论富有成效,但后来我找到了更好的方法。在多处理模块中有一个名为“Pool”的方法,非常适合我的需要。

    它会根据我的系统拥有的内核数进行自我优化。i、 e.生成的进程数与核心数相同。当然这是可定制的。这是密码。以后可能会帮助别人-

    from multiprocessing import Pool
    
    def main():
        po = Pool()
        for file in glob.glob('*.csv'):
            filepath = os.path.join(DATA_DIR, file)
            po.apply_async(mine_page, (filepath,), callback=save_data)
        po.close()
        po.join()
        file_ptr.close()
    
    def mine_page(filepath):
        #do whatever it is that you want to do in a separate process.
        return data
    
    def save_data(data):
        #data is a object. Store it in a file, mysql or...
        return
    

    还在经历这个巨大的模块。不确定save_data()是由父进程执行还是由派生的子进程使用此函数。如果是子级进行保存,则在某些情况下可能会导致并发问题。如果有人有使用本模块的经验,你会很感激这里有更多的知识。。。

        2
  •  3
  •   Eric Snow    14 年前

    http://docs.python.org/dev/library/multiprocessing.html#sharing-state-between-processes

    我确信每个进程都会得到一个新的解释器,然后将目标(函数)和参数加载到其中。在这种情况下,脚本中的全局名称空间将被绑定到worker函数,因此data\u文件将在那里。但是,我不确定在文件描述符被复制时会发生什么。您是否尝试将文件对象作为参数之一传递?

    put 结果和主代码 get 保存结果并将其写入文件。