代码之家  ›  专栏  ›  技术社区  ›  pylearner

python到pyspark,在pyspark中转换枢轴

  •  2
  • pylearner  · 技术社区  · 7 年前

    我已经在DataFrame下面完成了操作,并在python中实现了所需的输出。但我想把它转换成pyspark。

    d = {'user': ['A', 'A', 'B','B','C', 'D', 'C', 'E', 'D', 'E', 'F', 'F'], 'songs' : [11,22,99,11,11,44,66,66,33,55,11,77]}
    data = pd.DataFrame(data = d)
    
    
    e = {'user': ['A', 'B','C', 'D',  'E', 'F','A'], 'cluster': [1,2,3,1,2,3,2]}
    clus = pd.DataFrame(data= e)
    

    期望的输出:我想获得特定集群用户没有听过的所有歌曲。 A belongs to cluster 1, and cluster 1 has songs [11,22,33,44] so A hasnt listened to [33,44] 因此,我使用下面的python代码实现了这一点。

    user
    A    [33, 44]
    B    [55, 66]
    C        [77]
    D    [11, 22]
    E    [11, 99]
    F        [66]
    

    PYTHON代码:

    df = pd.merge(data, clus, on='user', how='left').drop_duplicates(['user','movie'])
    
    df1 = (df.groupby(['cluster']).apply(lambda x: x.pivot('user','movie','cluster').isnull())
            .fillna(False)
            .reset_index(level=0, drop=True)
            .sort_index())
    
    s = np.where(df1, ['{}'.format(x) for x in df1.columns], '')
    
    #remove empty values
    s1 = pd.Series([''.join(x).strip(', ') for x in s], index=df1.index)
    print (s1)
    

    在pyspark分布式编码中实现同样的功能很热门吗?

    1 回复  |  直到 7 年前
        1
  •  1
  •   mayank agrawal    7 年前

    可能有比这更好的解决方案,但它确实有效。

    假设每个用户只属于一个集群,

    import pyspark.sql.functions as F
    from pyspark.sql.types import *
    
    d = zip(['A', 'A', 'B','B','C', 'D', 'C', 'E', 'D', 'E', 'F', 'F'],[11,22,99,11,11,44,66,66,33,55,11,77])
    data = sql.createDataFrame(d).toDF('user','songs')
    

    这给了,

    +----+-----+
    |user|songs|
    +----+-----+
    |   A|   11|
    |   A|   22|
    |   B|   99|
    |   B|   11|
    |   C|   11|
    |   D|   44|
    |   C|   66|
    |   E|   66|
    |   D|   33|
    |   E|   55|
    |   F|   11|
    |   F|   77|
    +----+-----+
    

    假设每个用户只属于一个集群,则创建集群,

    c = zip(['A', 'B','C', 'D',  'E', 'F'],[1,2,3,1,2,3])
    clus = sql.createDataFrame(c).toDF('user','cluster')
    clus.show()
    
    +----+-------+
    |user|cluster|
    +----+-------+
    |   A|      1|
    |   B|      2|
    |   C|      3|
    |   D|      1|
    |   E|      2|
    |   F|      3|
    +----+-------+
    

    现在,我们可以获得用户及其群集听到的所有歌曲,

    all_combine = data.groupBy('user').agg(F.collect_list('songs').alias('songs'))\
                      .join(clus, data.user==clus.user).select(data['user'],'songs','cluster')
    all_combine.show()
    +----+--------+-------+                                                         
    |user|   songs|cluster|
    +----+--------+-------+
    |   F|[11, 77]|      3|
    |   E|[66, 55]|      2|
    |   B|[99, 11]|      2|
    |   D|[44, 33]|      1|
    |   C|[11, 66]|      3|
    |   A|[11, 22]|      1|
    +----+--------+-------+
    

    最后,计算群集中听到的所有歌曲,以及随后该群集中用户未听到的所有歌曲,

    not_listened = F.udf(lambda song,all_: list(set(all_) - set(song)) , ArrayType(IntegerType()))
    
    grouped_clusters = data.join(clus, data.user==clus.user).select(data['user'],'songs','cluster')\
                        .groupby('cluster').agg(F.collect_list('songs').alias('all_songs'))\
                        .join(all_combine, ['cluster']).select('user', all_combine['cluster'], 'songs', 'all_songs')\
                        .select('user', not_listened(F.col('songs'), F.col('all_songs')).alias('not_listened'))
    grouped_clusters.show()
    

    我们得到的输出为,

    +----+------------+                                                             
    |user|not_listened|
    +----+------------+
    |   D|    [11, 22]|
    |   A|    [33, 44]|
    |   F|        [66]|
    |   C|        [77]|
    |   E|    [99, 11]|
    |   B|    [66, 55]|
    +----+------------+