代码中出现问题的主要原因有两个
-
python内置sum函数的使用。这是一个函数,它取一个数的iterable并返回它们的和。
e、 g.如果您尝试对数据帧df的一个片段求和,您将得到相同的错误回溯
总和(df.loc【1】)
TypeError Traceback (most recent call last)
<ipython-input-60-6dea0ab0880f> in <module>()
----> 1 sum(df.loc[1])
TypeError: unsupported operand type(s) for +: 'int' and 'str'
要解决这个问题,你需要使用熊猫
sum
功能如下所示
df.loc[1].sum()
#output
A 2
B 4
C 6
dtype: int64
如您所见,这将产生预期的结果。i、 e.对数据片中的列求和
-
第二个问题是“减少”阶段。每个进程将返回一个数据帧
结果=排序(结果,键=λx:x[0])
返回pd。concat([i[1]表示结果中的i])
第一行将产生错误,因为任何结果都没有名为0的列。第二行存在类似问题。这可以通过以下方式解决
return pd.concat(result,axis=1)
现在,考虑到所使用的数据,代码将无问题运行。
总体代码:
import multiprocessing as mp
import pandas as pd
import numpy as np
def _apply_df(args):
df, func = args
return df.groupby(level=0).apply(func)
def mp_apply(df, func):
workers = 4
pool = mp.Pool(processes=workers)
split_dfs = np.array_split(df, workers, axis=1)
result = pool.map(_apply_df, [(d, func) for d in split_dfs])
pool.close()
#result = sorted(result, key=lambda x: x[0])
return pd.concat(result,axis=1)
def my_func(x):
return x.sum()
if __name__ == '__main__':
df = pd.DataFrame([[1, 2, 3, 1], [1, 2, 3, 1], [4, 5, 6, 2], [7, 8, 9, 2]], columns=['A', 'B', 'C', 'cluster_id'])
df = df.set_index('cluster_id')
out = mp_apply(df, my_func)
print(out)
输出:
A B C
cluster_id
1 2 4 6
2 11 13 15