正如Anony Mouse所评论的那样,(Py)Spark ML确实是
很
更有限的是,scikit学习或其他类似的软件包,这样的功能并不微不足道;然而,这里有一种方法可以获得您想要的(集群统计):
spark.version
# u'2.2.0'
from pyspark.ml.clustering import KMeans
from pyspark.ml.linalg import Vectors
# toy data - 5-d features including sparse vectors
df = spark.createDataFrame(
[(Vectors.sparse(5,[(0, 164.0),(1,520.0)]), 1.0),
(Vectors.dense([519.0,2723.0,0.0,3.0,4.0]), 1.0),
(Vectors.sparse(5,[(0, 2868.0), (1, 928.0)]), 1.0),
(Vectors.sparse(5,[(0, 57.0), (1, 2715.0)]), 0.0),
(Vectors.dense([1241.0,2104.0,0.0,0.0,2.0]), 1.0)],
["features", "target"])
df.show()
# +--------------------+------+
# | features|target|
# +--------------------+------+
# |(5,[0,1],[164.0,5...| 1.0|
# |[519.0,2723.0,0.0...| 1.0|
# |(5,[0,1],[2868.0,...| 1.0|
# |(5,[0,1],[57.0,27...| 0.0|
# |[1241.0,2104.0,0....| 1.0|
# +--------------------+------+
kmeans = KMeans(k=3, seed=1)
model = kmeans.fit(df.select('features'))
transformed = model.transform(df).select("features", "prediction")
transformed.show()
# +--------------------+----------+
# | features|prediction|
# +--------------------+----------+
# |(5,[0,1],[164.0,5...| 1|
# |[519.0,2723.0,0.0...| 2|
# |(5,[0,1],[2868.0,...| 0|
# |(5,[0,1],[57.0,27...| 2|
# |[1241.0,2104.0,0....| 2|
# +--------------------+----------+
到目前为止,关于你的第一个问题:
我如何解释
transformed
?
这个
features
列只是原始数据中同一列的复制。
这个
prediction
列是各个数据记录所属的集群;在我的示例中,有5条数据记录和
k=3
集群中,我在集群#0中有1条记录,在集群#1中有1条记录,在集群#2中有3条记录。
关于第二个问题:
如何从中创建一个或多个Pandas数据帧
转化
这将显示14个集群中每个集群的13个特性中的每个特性的摘要统计信息?
(注意:似乎你有
14
功能,而不是13…)
这是一个看似简单的任务的好例子,不幸的是,PySpark并没有提供现成的功能,尤其是因为所有功能都分组在一个
仅有一个的
矢量
特征
; 为此,我们必须首先“拆解”
特征
,有效地提出了
倒转
的操作
VectorAssembler
.
目前我能想到的唯一方法是暂时恢复到RDD并执行
map
操作[编辑:这不是真的必要-请参阅下面的更新];下面是我上面的集群#2的一个示例,其中包含密集向量和稀疏向量:
# keep only cluster #2:
cl_2 = transformed.filter(transformed.prediction==2)
cl_2.show()
# +--------------------+----------+
# | features|prediction|
# +--------------------+----------+
# |[519.0,2723.0,0.0...| 2|
# |(5,[0,1],[57.0,27...| 2|
# |[1241.0,2104.0,0....| 2|
# +--------------------+----------+
# set the data dimensionality as a parameter:
dimensionality = 5
cluster_2 = cl_2.drop('prediction').rdd.map(lambda x: [float(x[0][i]) for i in range(dimensionality)]).toDF(schema=['x'+str(i) for i in range(dimensionality)])
cluster_2.show()
# +------+------+---+---+---+
# | x0| x1| x2| x3| x4|
# +------+------+---+---+---+
# | 519.0|2723.0|0.0|3.0|4.0|
# | 57.0|2715.0|0.0|0.0|0.0|
# |1241.0|2104.0|0.0|0.0|2.0|
# +------+------+---+---+---+
(如果您在Spark数据框中有初始数据
initial_data
,您可以将最后一部分更改为
toDF(schema=initial_data.columns)
从这一点上,您可以转换
cluster_2
将数据帧转换为一个数据帧(如果它适合您的内存),或者使用
describe()
Spark dataframes的功能用于获取摘要统计信息:
cluster_2.describe().show()
# result:
+-------+-----------------+-----------------+---+------------------+---+
|summary| x0| x1| x2| x3| x4|
+-------+-----------------+-----------------+---+------------------+---+
| count| 3| 3| 3| 3| 3|
| mean|605.6666666666666| 2514.0|0.0| 1.0|2.0|
| stddev|596.7389155512932|355.0929455790413|0.0|1.7320508075688772|2.0|
| min| 57.0| 2104.0|0.0| 0.0|0.0|
| max| 1241.0| 2723.0|0.0| 3.0|4.0|
+-------+-----------------+-----------------+---+------------------+---+
将上述代码用于
dimensionality=14
在你的情况下,应该做这项工作。。。
对所有这些(可以说是无用的)重要数字感到恼火
mean
和
stddev
? 作为奖励,这里有一个我提出的小效用函数
some time ago
对于一个漂亮的总结:
def prettySummary(df):
""" Neat summary statistics of a Spark dataframe
Args:
pyspark.sql.dataframe.DataFrame (df): input dataframe
Returns:
pandas.core.frame.DataFrame: a pandas dataframe with the summary statistics of df
"""
import pandas as pd
temp = df.describe().toPandas()
temp.iloc[1:3,1:] = temp.iloc[1:3,1:].convert_objects(convert_numeric=True)
pd.options.display.float_format = '{:,.2f}'.format
return temp
stats_df = prettySummary(cluster_2)
stats_df
# result:
summary x0 x1 x2 x3 x4
0 count 3 3 3 3 3
1 mean 605.67 2,514.00 0.00 1.00 2.00
2 stddev 596.74 355.09 0.00 1.73 2.00
3 min 57.0 2104.0 0.0 0.0 0.0
4 max 1241.0 2723.0 0.0 3.0 4.0
更新
:再想一想,看看您的示例数据,我想出了一个更简单的解决方案,不需要调用中间RDD(如果可能的话,人们可能更愿意避免这种操作)。。。
关键的观察是
,即。
没有
这个
select
声明;保持与上述相同的玩具数据集,我们得到:
transformed = model.transform(df) # no 'select' statements
transformed.show()
# +--------------------+------+----------+
# | features|target|prediction|
# +--------------------+------+----------+
# |(5,[0,1],[164.0,5...| 1.0| 1|
# |[519.0,2723.0,0.0...| 1.0| 2|
# |(5,[0,1],[2868.0,...| 1.0| 0|
# |(5,[0,1],[57.0,27...| 0.0| 2|
# |[1241.0,2104.0,0....| 1.0| 2|
# +--------------------+------+----------+
如您所见,数据帧中存在的任何其他列
df
被改造(就我而言只有一个)-
target
希望你开始明白这个想法:如果
测向
包含最初的14个功能,每个功能在单独的一列中,再加上第15列,名为
特征
(大致如示例数据所示,但没有最后一列),然后是以下代码:
kmeans = KMeans().setK(14)
model = kmeans.fit(df.select('features'))
transformed = model.transform(df).drop('features')
将留给您一个Spark数据帧
转化
包含15列,即您最初的14个功能加上一个
预言
具有相应群集编号的列。
从这一点上,您可以继续,正如我上面所示
filter
转化
并获得您的摘要统计数据,但您将避免(昂贵的…)转换为中间临时RDD,从而将所有操作保持在Spark数据帧的更高效上下文中。。。