使用PySpark,我想得到一对RDD中所有键的最大值。基本RDD数据如下:
Social_Context.take(10):
[(1008044337136001024, 0.9343283582089552),
(1008044334510428160, 3.103463393248575),
(1008044334413852677, 0.7622047244094489),
(1008044333260509185, 0.493006993006993),
(1008044331641593856, 1.6094069529652353),
(1008044329062092801, 0.481981981981982),
(1008044326675460096, 1.3606889564336373),
(1008044325710782469, 0.7228464419475655),
(1008044323370295296, 0.46547314578005117),
(1008044320757354497, 353.8944618599791)]
int
值类型为
float
. 正在尝试获取所有键中的最大值:
Social_Context_MAX = Social_Context.map(lambda x : x[1]).max()
print(Social_Context_MAX)
TypeError: 'float' object has no attribute '__getitem__'
我还尝试使用Spark数据帧。
Social_Context_MAX = Social_Context.toDF(["id", "value"])
print(Social_Context_MAX.agg({"value": "max"}).collect()[0][0])
但我又犯了一个错误:
raise TypeError("Unexpected obj type: %s" % type(obj))
TypeError: Unexpected obj type: <type 'float'>
更新
:我将JSON文件中的基本数据读入dataframe,然后将其转换为RDD。以下是更详细的代码:
raw_data = spark.read.json("../input/Spark_tweets.json")
selected_data = raw_data.select("full_text", "id", "retweet_count", "user", "created_at", "entities")\
.withColumn('verified', udf(getVerified)(raw_data.user))\
.withColumn('followers_count', udf(getFollowerCount)(raw_data.user))\
.withColumn('friends_count', udf(getFriendsCount)(raw_data.user))\
.withColumn("hashtags", udf(getHashtags)(raw_data.entities))\
.drop('user')\
.drop('entities')
Social_Context = selected_data.rdd.map(lambda row : getSocialContext(row))
Social_Context_MAX = Social_Context.map(lambda x : x[1])
print Social_Context_MAX.max(key = lambda x : x[0])
def getSocialContext(row):
A = int(row[2])
B = int(row[5])
C = float(row[6])
if C == 0:
return Default_Social_Context
if (A > 0):
res = (row[1], B * A / C)
return res
else:
res = (row[1], B / C)
return res
Social_Context
.