您只需提供正确的数据类型
udf_flatten_counter = sf.udf(
lambda x: dict(Counter(x)),
ty.MapType(ty.IntegerType(), ty.IntegerType()))
df = spark.createDataFrame(
[(1, [1, 2, 3, 4, 5, 6]), (2, [2, 3])], ("store", "values"))
df.withColumn("cnt", udf_flatten_counter("values")).show(2, False)
# +-----+------------------+---------------------------------------------------+
# |store|values |cnt |
# +-----+------------------+---------------------------------------------------+
# |1 |[1, 2, 3, 4, 5, 6]|Map(5 -> 1, 1 -> 1, 6 -> 1, 2 -> 1, 3 -> 1, 4 -> 1)|
# |2 |[2, 3] |Map(2 -> 1, 3 -> 1) |
# +-----+------------------+---------------------------------------------------+
与RDD类似
df.rdd.mapValues(Counter).mapValues(dict).toDF(["store", "values"]).show(2, False)
# +-----+---------------------------------------------------+
# |store|values |
# +-----+---------------------------------------------------+
# |1 |Map(5 -> 1, 1 -> 1, 6 -> 1, 2 -> 1, 3 -> 1, 4 -> 1)|
# |2 |Map(2 -> 1, 3 -> 1) |
# +-----+---------------------------------------------------+
转换为
dict
因为显然辉绿岩无法处理
Counter
物体。