代码之家  ›  专栏  ›  技术社区  ›  Michael

如何用字典正确使用reduce

  •  0
  • Michael  · 技术社区  · 5 年前

    我正在使用自定义函数作为reduce操作的一部分。对于以下示例,我收到以下消息 TypeError: reduce() takes no keyword arguments -我相信这是由于我使用字典的方式造成的 mapping 在函数中 exposed_colum -你能帮我修一下这个功能吗?

    from pyspark.sql import DataFrame, Row
    from pyspark.sql.functions import col
    from pyspark.sql import SparkSession
    from functools import reduce
    
    
    def process_data(df: DataFrame):
        col_mapping = dict(zip(["name", "age"], ["a", "b"]))
    
        # Do other things...
    
        def exposed_column(df: DataFrame, mapping: dict):
            return df.select([col(c).alias(mapping.get(c, c)) for c in df.columns])
    
        return reduce(exposed_column, sequence=col_mapping, initial=df)
    
    
    spark = SparkSession.builder.appName("app").getOrCreate()
    l = [
        ("Bob", 25, "Spain"),
        ("Marc", 22, "France"),
        ("Steve", 20, "Belgium"),
        ("Donald", 26, "USA"),
    ]
    rdd = spark.sparkContext.parallelize(l)
    people = rdd.map(lambda x: Row(name=x[0], age=int(x[1]), country=x[2])).toDF()
    
    people.show()
    process_data(people).show()
    

    people.show() 看起来像这样

    +---+-------+------+
    |age|country|  name|
    +---+-------+------+
    | 25|  Spain|   Bob|
    | 22| France|  Marc|
    | 20|Belgium| Steve|
    | 26|    USA|Donald|
    +---+-------+------+
    

    这是预期的输出

    +------+---+
    |     a|  b|
    +------+---+
    |   Bob| 25|
    |  Marc| 22|
    | Steve| 20|
    |Donald| 26|
    +------+---+
    
    0 回复  |  直到 5 年前
        1
  •  2
  •   Oliver W.    5 年前

    reduce 不接受关键字,这是真的。 一旦你删除了关键字,你会注意到一个更严重的问题:当你迭代字典时,你只迭代它的关键字。因此,您尝试批量重命名列的函数不会达到您的目的。

    执行批处理列重命名的一种方法是迭代字典 items :

    from typing import Mapping
    from pyspark.sql import DataFrame
    
    def rename_columns(frame: DataFrame, mapping: Mapping[str, str]) -> DataFrame:
        return reduce(lambda f, old_new: f.withColumnRenamed(old_new[0], old_new[1]),
                      mapping.items(), frame)
    

    这允许您传入词典(请注意 the recommendation 为参数添加类型提示是使用 Mapping ,不 dict )将列名映射到其他名称。幸运的是, withColumnRenamed 如果您尝试重命名不在列表中的列,则不会抱怨 DataFrame ,所以这相当于你的 mapping.get(c, c) .

    我在你的代码中没有注意到的一件事是,它正在删除 country 列。所以这仍然会在你的输出中。