代码之家  ›  专栏  ›  技术社区  ›  Fomalhaut

为什么要进行多重处理。Lock()在Python中不锁定共享资源?

  •  10
  • Fomalhaut  · 技术社区  · 7 年前

    假设我有一个非常大的文本文件,由许多行组成,我想反转。我不在乎最后的订单。输入文件包含西里尔文符号。我使用 multiprocessing 在几个核上处理。

    我写了这样的程序:

    # task.py
    
    import multiprocessing as mp
    
    
    POOL_NUMBER = 2
    
    
    lock_read = mp.Lock()
    lock_write = mp.Lock()
    
    fi = open('input.txt', 'r')
    fo = open('output.txt', 'w')
    
    def handle(line):
        # In the future I want to do
        # some more complicated operations over the line
        return line.strip()[::-1]  # Reversing
    
    def target():
        while True:
            try:
                with lock_read:
                    line = next(fi)
            except StopIteration:
                break
    
            line = handle(line)
    
            with lock_write:
                print(line, file=fo)
    
    pool = [mp.Process(target=target) for _ in range(POOL_NUMBER)]
    for p in pool:
        p.start()
    for p in pool:
        p.join()
    
    fi.close()
    fo.close()
    

    该程序失败并出现错误:

    Process Process-2:
    Process Process-1:
    Traceback (most recent call last):
      File "/usr/lib/python3.5/multiprocessing/process.py", line 249, in _bootstrap
        self.run()
      File "/usr/lib/python3.5/multiprocessing/process.py", line 93, in run
        self._target(*self._args, **self._kwargs)
      File "task.py", line 22, in target
        line = next(fi)
      File "/usr/lib/python3.5/codecs.py", line 321, in decode
        (result, consumed) = self._buffer_decode(data, self.errors, final)
    UnicodeDecodeError: 'utf-8' codec can't decode byte 0x8b in position 0: invalid start byte
    Traceback (most recent call last):
      File "/usr/lib/python3.5/multiprocessing/process.py", line 249, in _bootstrap
        self.run()
      File "/usr/lib/python3.5/multiprocessing/process.py", line 93, in run
        self._target(*self._args, **self._kwargs)
      File "task.py", line 22, in target
        line = next(fi)
      File "/usr/lib/python3.5/codecs.py", line 321, in decode
        (result, consumed) = self._buffer_decode(data, self.errors, final)
    UnicodeDecodeError: 'utf-8' codec can't decode byte 0xd1 in position 0: invalid continuation byte
    

    另一方面,如果我设置 POOL_NUMBER = 1 . 但是,如果我想获得总体性能,这是没有意义的。

    为什么会发生这种错误?我该如何修复它?

    Python 3.5.2

    我使用以下脚本生成数据:

    # gen_file.py
    
    from random import randint
    
    
    LENGTH = 100
    SIZE = 100000
    
    
    def gen_word(length):
        return ''.join(
            chr(randint(ord('а'), ord('я')))
            for _ in range(length)
        )
    
    
    if __name__ == "__main__":
        with open('input.txt', 'w') as f:
            for _ in range(SIZE):
                print(gen_word(LENGTH), file=f)
    
    2 回复  |  直到 7 年前
        1
  •  4
  •   Or Duan    7 年前

    这里的问题是,从多个进程中读取文件并不像您想象的那样工作,您无法共享 open 进程之间的对象。

    你可以做一个全球性的 current_line 变量,并且每次读取文件和处理当前行,都不理想。

    这是一种不同的方法,使用进程池 map 方法,我在文件上迭代,对于每一行,我将您的目标方法排入队列:

    from multiprocessing import Lock
    from multiprocessing import Pool
    import time
    import os
    
    POOL_NUMBER = 8
    
    def target(line):
        # Really need some processing here
        for _ in range(2**10):
            pass
        return line[::-1]
    
    
    pool = Pool(processes=POOL_NUMBER)
    os.truncate('output.txt', 0)  # Just to make sure we have plan new file
    with open('input.txt', 'r') as fi:
        t0 = time.time()
        processed_lines = pool.map(target, fi.readlines())
        print('Total time', time.time() - t0)
    
        with open('output.txt', 'w') as fo:
            for processed_line in processed_lines:
                fo.writelines(processed_line)
    

    我的机器上有8个进程: Total time 1.3367934226989746

    使用1个过程: Total time 4.324501991271973

    如果您的目标函数是CPU限制的,那么这种方法效果最好,另一种方法是将文件拆分为 POOL_NUMBER 并使每个进程写入一个已处理的数据块(带锁!)到输出文件。

    另一种方法是创建一个主进程,为其余进程执行写操作, here 就是一个例子。

    编辑

    在你发表评论后,我觉得你无法将文件放入内存。 为此,您可以在file对象上迭代,该对象将逐行读取到内存中。但我们需要对代码进行一点大的修改:

    POOL_NUMBER = 8
    CHUNK_SIZE = 50000
    
    def target(line):
        # This is not a measurable task, since most of the time wil spent on writing the data
        # if you have a CPU bound task, this code will make sense
        return line[::-1]
    
    
    pool = Pool(processes=POOL_NUMBER)
    os.truncate('output.txt', 0)  # Just to make sure we have plan new file
    processed_lines = []
    
    with open('input.txt', 'r') as fi:
        t0 = time.time()
        for line in fi:
            processed_lines.append(pool.apply_async(target, (line,)))  # Keep a refernce to this task, but don't 
    
            if len(processed_lines) == CHUNK_SIZE:
                with open('output.txt', 'w') as fo:  # reading the file line by line
                    for processed_line in processed_lines:
                        fo.writelines(processed_line.get())
                processed_lines = []  # truncate the result list, and let the garbage collector collect the unused memory, if we don't clear the list we will ran out of memory!
        print('Total time', time.time() - t0)
    

    记住,你可以玩 CHUNK_SIZE 变量来控制您使用的内存量。对我来说,每道工序最多5000米。

    附笔

    我认为最好将大文件分割成小文件,这样可以解决文件的读写锁定问题,并使其可扩展到进程(即使在不同的机器上!)

        2
  •  0
  •   Fomalhaut    7 年前

    line = next(fi) 在不同的 Process .

    有可能绕过使用 next(fi) 借助于由程序主线程填充并由每个进程读取的行的临时缓冲区。对于这个角色,最好使用 multiprocessing.Queue .

    这是我的剧本:

    from time import sleep, time
    import multiprocessing as mp
    import queue
    
    
    MAX_QUEUE_SIZE = 1000
    QUEUE_TIMEOUT = 0.000001
    POOL_NUMBER = 4
    
    
    def handle(line):
        sleep(0.00001)  # Some processing here that takes time
        return line.strip()[::-1]
    
    
    def target(fout, write_lock, lines_queue):
        while True:
            try:
                line = lines_queue.get(timeout=1.0)
                line = handle(line)
                with write_lock:
                    print(line, file=fout)
                    fout.flush()
            except queue.Empty:
                break
    
    
    if __name__ == "__main__":
        time_begin = time()
    
        with open('output.txt', 'w') as fout:
            write_lock = mp.Lock()
            lines_queue = mp.Queue()
    
            processes = [
                mp.Process(target=target, args=(fout, write_lock, lines_queue))
                for _ in range(POOL_NUMBER)
            ]
            for p in processes:
                p.start()
    
            with open('input.txt', 'r') as fin:
                while True:
                    try:
                        while lines_queue.qsize() < MAX_QUEUE_SIZE:
                            line = next(fin)
                            lines_queue.put(line)
                        sleep(QUEUE_TIMEOUT)
                    except StopIteration:
                        break
    
            for p in processes:
                p.join()
    
        time_end = time()
        print("Time:", time_end - time_begin)
    

    在我的CPU上,我得到了这个结果:

    POOL_NUMBER = 1 -> Time: 17.877086400985718
    POOL_NUMBER = 2 -> Time: 8.611438989639282
    POOL_NUMBER = 3 -> Time: 6.332395553588867
    POOL_NUMBER = 4 -> Time: 5.321753978729248