代码之家  ›  专栏  ›  技术社区  ›  Carmen Pérez Carrillo

ArrayColumn Pyspark上的计数器函数

  •  3
  • Carmen Pérez Carrillo  · 技术社区  · 6 年前

    从此数据框

    +-----+-----------------+
    |store|     values      |
    +-----+-----------------+
    |    1|[1, 2, 3,4, 5, 6]|
    |    2|            [2,3]|
    +-----+-----------------+
    

    我想申请 Counter 函数以获取:

    +-----+------------------------------+
    |store|     values                   |
    +-----+------------------------------+
    |    1|{1:1, 2:1, 3:1, 4:1, 5:1, 6:1}|
    |    2|{2:1, 3:1}                    |
    +-----+------------------------------+
    

    我用另一个问题的答案得到了这个数据框:

    GroupBy and concat array columns pyspark

    因此,我尝试修改答案中的代码,如下所示:

    选项1:

    def flatten_counter(val):
        return Counter(reduce (lambda x, y:x+y, val))
    
    udf_flatten_counter = sf.udf(flatten_counter,     ty.ArrayType(ty.IntegerType()))
    df3 = df2.select("store", flatten_counter("values2").alias("values3"))
    df3.show(truncate=False)
    

    选项2:

    df.rdd.map(lambda r: (r.store, r.values)).reduceByKey(lambda x, y: x + y).map(lambda row: Counter(row[1])).toDF(['store', 'values']).show()
    

    但它不起作用。

    有人知道我怎么做吗?

    非常感谢。

    1 回复  |  直到 5 年前
        1
  •  8
  •   Alper t. Turker    6 年前

    您只需提供正确的数据类型

    udf_flatten_counter = sf.udf(
        lambda x: dict(Counter(x)),
        ty.MapType(ty.IntegerType(), ty.IntegerType()))
    
    df = spark.createDataFrame(
       [(1, [1, 2, 3, 4, 5, 6]), (2, [2, 3])], ("store", "values"))
    
    
    df.withColumn("cnt", udf_flatten_counter("values")).show(2, False)
    # +-----+------------------+---------------------------------------------------+
    # |store|values            |cnt                                                |
    # +-----+------------------+---------------------------------------------------+
    # |1    |[1, 2, 3, 4, 5, 6]|Map(5 -> 1, 1 -> 1, 6 -> 1, 2 -> 1, 3 -> 1, 4 -> 1)|
    # |2    |[2, 3]            |Map(2 -> 1, 3 -> 1)                                |
    # +-----+------------------+---------------------------------------------------+
    

    与RDD类似

    df.rdd.mapValues(Counter).mapValues(dict).toDF(["store", "values"]).show(2, False)
    # +-----+---------------------------------------------------+
    # |store|values                                             |
    # +-----+---------------------------------------------------+
    # |1    |Map(5 -> 1, 1 -> 1, 6 -> 1, 2 -> 1, 3 -> 1, 4 -> 1)|
    # |2    |Map(2 -> 1, 3 -> 1)                                |
    # +-----+---------------------------------------------------+
    

    转换为 dict 因为显然辉绿岩无法处理 Counter 物体。