代码之家  ›  专栏  ›  技术社区  ›  user1403546

python-类字段和方法的多处理问题

  •  1
  • user1403546  · 技术社区  · 7 年前

    在一个数据分析python项目中,我需要同时使用类和多处理功能,我还没有在谷歌上找到一个很好的例子。

    我的基本想法——这可能是错误的——是用一个大变量创建一个类(在我的例子中是熊猫数据帧),然后定义一个计算运算的方法(在本例中是求和)。

    import multiprocessing
    import time
    
    class C:
        def __init__(self):
            self.__data = list(range(0, 10**7))
    
        def func(self, nums):
            return sum(nums)
    
        def start_multi(self):
            for n_procs in range(1, 4):
                print()
                time_start = time.clock()
                chunks = [self.__data[(i-1)*len(self.__data)// n_procs: (i)*len(self.__data)// n_procs] for i in range(1, n_procs+1)]
                pool = multiprocessing.Pool(processes=n_procs)
                results = pool.map_async(self.func, chunks )
                results.wait()
                pool.close()
                results = results.get()
                print(sum(results))
                print("n_procs", n_procs, "total time: ", time.clock() - time_start)
    
    print('sum(list(range(0, 10**7)))', sum(list(range(0, 10**7))))
    c = C()
    c.start_multi()
    

    代码无法正常工作:我得到以下打印输出

    sum(list(range(0, 10**7))) 49999995000000
    
    49999995000000
    n_procs 1 total time:  0.45133500000000026
    
    49999995000000
    n_procs 2 total time:  0.8055279999999954
    
    49999995000000
    n_procs 3 total time:  1.1330870000000033
    

    但我也担心RAM的使用情况,因为当创建变量块时,自身__数据RAM使用率翻了一番。在处理多处理代码时,特别是在这段代码中,是否可能避免这种内存浪费?(我保证将来我会把一切都点燃:)

    1 回复  |  直到 7 年前
        1
  •  1
  •   Paul    7 年前

    这里似乎有一些事情在起作用:

    1. 分块操作相当慢。在我的电脑上 chunks 对于有多个过程的案例,大约需要16%的时间。单进程、非池版本没有这种开销。
    2. 您正在向流程发送大量数据。这个 数组是需要获取的范围的所有原始数据 pickled
    3. 一般来说,如果你把计时器放在 func 你会发现大部分时间都没有在那里度过。这就是为什么你没有看到加速。大部分时间花在大块、酸洗、叉子和其他开销上。

    作为替代方案,您应该尝试切换分块技术,只计算开始和结束数字,并避免发送过多数据。

    接下来,我建议做一些比计算总和更难的计算。例如,可以尝试计算素数。这里有一个例子,我们使用简单的素数计算 here 我们使用了一种改进的分块技术。否则,尝试保持代码不变。

    import multiprocessing
    import time
    from math import sqrt; from itertools import count, islice
    
    # credit to https://stackoverflow.com/a/27946768
    def isPrime(n):
        return n > 1 and all(n%i for i in islice(count(2), int(sqrt(n)-1)))
    
    limit = 6
    class C:
        def __init__(self):
            pass
    
        def func(self, start_end_tuple):
            start, end = start_end_tuple
            primes = []
            for x in range(start, end):
                if isPrime(x):
                    primes.append(x)
            return len(primes)
    
        def get_chunks(self, total_size, n_procs):
            # start and end value tuples
            chunks = []
    
            # Example: (10, 5) -> (2, 0) so 2 numbers per process
            # (10, 3) -> (3, 1) or here the first process does 4 and the others do 3
            quotient, remainder = divmod(total_size, n_procs)
            current_start = 0
            for i in range(0, n_procs):
                my_amount = quotient
                if i == 0:
                    # somebody needs to do extra
                    my_amount += remainder
                chunks.append((current_start, current_start + my_amount))
                current_start += my_amount
            return chunks
    
        def start_multi(self):
            for n_procs in range(1, 4):
                time_start = time.clock()
                # chunk the start and end indices instead
                chunks = self.get_chunks(10**limit, n_procs)
                pool = multiprocessing.Pool(processes=n_procs)
                results = pool.map_async(self.func, chunks)
                results.wait()
                results = results.get()
                print(sum(results))
                time_delta = time.clock() - time_start
                print("n_procs {} time {}".format(n_procs, time_delta))
    
    c = C()
    time_start = time.clock()
    print("serial func(...) = {}".format(c.func((1, 10**limit))))
    print("total time {}".format(time.clock() - time_start))
    c.start_multi()
    

    这将导致多个进程的加速。假设你有它的核心。