代码之家  ›  专栏  ›  技术社区  ›  Aenaon

分组依据/应用熊猫和多处理

  •  3
  • Aenaon  · 技术社区  · 6 年前

    我正在尝试使用多处理在熊猫数据帧上执行groupby和apply操作(希望加快代码的速度)。例如,如果我有如下数据帧:

                A  B  C
    cluster_id         
    1           1  2  3
    1           1  2  3
    2           4  5  6
    2           7  8  9
    

    我想在列上应用一个函数,并根据cluster\u id对它们进行分组

    def my_func(x):
        return sum(x)
    

    然后操作应产生:

                A   B   C
    cluster_id         
    1           2   4   6
    2           11  13  15
    

    有一些类似的帖子,所以,我确实设法找到了一个接近的地方,但还没有真正解决它。我的代码失败了,我不知道如何修复它。这就是我想到的

    import multiprocessing as mp
    import pandas as pd
    import numpy as np
    
    
    def _apply_df(args):
        df, func = args
        return df.groupby(level=0).apply(func)
    
    
    def mp_apply(df, func):
        workers = 4
        pool = mp.Pool(processes=workers)
        split_dfs = np.array_split(df, workers, axis=1)
        result = pool.map(_apply_df, [(d, func) for d in split_dfs])
        pool.close()
        result = sorted(result, key=lambda x: x[0])
        return pd.concat([i[1] for i in result])
    
    
    def my_func(x):
        return sum(x)
    
    
    if __name__ == '__main__':
        df = pd.DataFrame([[1, 2, 3, 1], [1, 2, 3, 1], [4, 5, 6, 2], [7, 8, 9, 2]], columns=['A', 'B', 'C', 'cluster_id'])
        df = df.set_index('cluster_id')
        out = mp_apply(df, my_func)
        print(out)
    

    我收到错误:

      TypeError: unsupported operand type(s) for +: 'int' and 'str'
    

    看起来它在线路上失败了

    result = pool.map(_apply_df, [(d, func) for d in split_dfs])
    

    参数 d 已传递到 _apply_df 看起来是空的。

    非常感谢您的帮助/想法。如果有必要的话,我正在使用Python 3.6。谢谢

    1 回复  |  直到 6 年前
        1
  •  2
  •   sgDysregulation    6 年前

    代码中出现问题的主要原因有两个

    1. python内置sum函数的使用。这是一个函数,它取一个数的iterable并返回它们的和。 e、 g.如果您尝试对数据帧df的一个片段求和,您将得到相同的错误回溯

    总和(df.loc【1】)

    TypeError                                 Traceback (most recent call last)
        <ipython-input-60-6dea0ab0880f> in <module>()
        ----> 1 sum(df.loc[1])
    TypeError: unsupported operand type(s) for +: 'int' and 'str'
    

    要解决这个问题,你需要使用熊猫 sum 功能如下所示

    df.loc[1].sum()
    
    #output 
    A    2
    B    4
    C    6
    dtype: int64
    

    如您所见,这将产生预期的结果。i、 e.对数据片中的列求和

    1. 第二个问题是“减少”阶段。每个进程将返回一个数据帧

      结果=排序(结果,键=λx:x[0])

      返回pd。concat([i[1]表示结果中的i])

    第一行将产生错误,因为任何结果都没有名为0的列。第二行存在类似问题。这可以通过以下方式解决

    return pd.concat(result,axis=1)
    

    现在,考虑到所使用的数据,代码将无问题运行。

    总体代码:

    import multiprocessing as mp
    import pandas as pd
    import numpy as np
    
    
    def _apply_df(args):
        df, func = args
        return df.groupby(level=0).apply(func)
    
    
    def mp_apply(df, func):
        workers = 4
        pool = mp.Pool(processes=workers)
        split_dfs = np.array_split(df, workers, axis=1)
        result = pool.map(_apply_df, [(d, func) for d in split_dfs])
        pool.close()
        #result = sorted(result, key=lambda x: x[0])
        return pd.concat(result,axis=1)
    
    
    def my_func(x):
        return x.sum()
    
    
    if __name__ == '__main__':
        df = pd.DataFrame([[1, 2, 3, 1], [1, 2, 3, 1], [4, 5, 6, 2], [7, 8, 9, 2]], columns=['A', 'B', 'C', 'cluster_id'])
        df = df.set_index('cluster_id')
        out = mp_apply(df, my_func)
        print(out)
    

    输出:

                 A   B   C
    cluster_id            
    1            2   4   6
    2           11  13  15