代码之家  ›  专栏  ›  技术社区  ›  Nevermore

返回Dask应用程序中的结构化行

  •  0
  • Nevermore  · 技术社区  · 3 年前

    我正在对中的所有行应用函数 Dask数据帧 在里面 PySpark 我可以返回 spark.sql.Row 对象,以便为结果创建结构化行 DataFrame .在对dask数据帧中的行应用函数时,如何返回结构相似的行(包含列和类型)?

    我正在沿着以下路线寻找一些东西:

    # df is a dask.dataframe with a JSON blob in the `data` column
    
    def process(row):
        json_data = json.loads(row.data)
        return Row(a=json_data["a"], b=json_data["b")
    
    result = df.apply(
        process,
        axis=1,
    ).compute()
    
    result
    

    我看到那一排排就是它们自己 pd.Series ,所以我试过了 process 返回 Series 但是我得到了这个错误

    AttributeError: 'Series' object has no attribute 'columns'
    

    这个 documentation 建议我可以使用 meta 中的参数 apply :

    meta:与输出的数据类型和列名匹配的空pd.DataFrame或pd.Series。。。可以提供[Inputs like]iterable of(name,dtype)(注意,名称的顺序应该与列的顺序匹配)

    但是,当我使用 iterable 如建议的那样,元元组的数量

    result = df.apply(
        process,
        axis=1,
        meta=[("a", "int")]
    ).compute()
    

    它期待着一个 DataFrame 对象,并返回此错误

    AttributeError: 'DataFrame' object has no attribute 'name'
    
    0 回复  |  直到 3 年前
        1
  •  3
  •   SultanOrazbayev    3 年前

    这是一个围绕panda函数开发的dask包装器 here :

    # see unutbu's answer here: https://stackoverflow.com/a/25512372/10693596
    import json
    def json_to_series(text):
        keys, values = zip(*[item for dct in json.loads(text) for item in dct.items()])
        return pd.Series(values, index=keys)
    
    
    def process_chunk(df):
        _tmp = df['data'].apply(json_to_series)
        return pd.concat([df, _tmp], axis=1)
    
    result = df.map_partitions(process_chunk).compute()