关于这个问题有很多帖子,但是没有人回答我的问题。
我遇到了
OutOfMemoryError
当试图将许多不同的数据帧连接在一起时。
我的本地机器有16GB的内存,我已经设置了spark配置:
class SparkRawConsumer:
def __init__(self, filename, reference_date, FILM_DATA):
self.sparkContext = SparkContext(master='local[*]', appName='my_app')
SparkContext.setSystemProperty('spark.executor.memory', '3g')
SparkContext.setSystemProperty('spark.driver.memory', '15g')
很明显,有很多关于spark中oom错误的帖子,但基本上大多数都是为了增加内存属性。
我基本上是从50-60个较小的数据帧执行连接,这些数据帧有两列
uid
和
data_in_the_form_of_lists
(通常是python字符串的列表)。我要加入的主数据帧有大约10列,但也包含
液体
专栏(我正在加入)。
我只想加入1500行数据。然而,我会经常遇到内存不足的错误,因为很明显所有这些数据都可以放入内存。我在我的仓库里看了看斯巴奎,证实了这一点:
在代码中,我的连接如下所示:
metrics_df = metrics_df.join(
self.sqlContext.createDataFrame(metric1, schema=["uid", "metric1"]), on="uid")
metrics_df.count()
metrics_df.repartition("gid_value")
metrics_df = metrics_df.join(
self.sqlContext.createDataFrame(metric2, schema=["uid", "metric2"]),
on="gid_value")
metrics_df.repartition("gid_value")
metrics_df = metrics_df.join(
self.sqlContext.createDataFrame(metric3, schema=["uid", "metric3"]),
on="uid")
metrics_df.count()
metrics_df.repartition("gid_value")
在哪里?
metric1
我是说,
metric2
和
metric3
在连接之前,我是否将RDD转换为数据帧(请记住,实际上有50个更小的
metric
我要加入的dfs)。
我打电话给
metric.count()
强制求值,因为它似乎有助于防止内存错误(否则,当尝试最终收集时,我将得到更多的驱动程序错误)。
这些误差是不确定的。我看不到它们在我的连接中的任何特定位置持续发生,有时似乎正在发生我的最后一次连接
metrics_df.collect()
呼叫,有时在较小的连接期间。
我真的怀疑任务序列化/反序列化存在一些问题。例如,当我查看一个典型阶段的事件时间线时,我发现大部分时间都被任务反序列化占用:
我还注意到,垃圾收集时间有很多:
垃圾回收是导致内存错误的问题吗?还是任务序列化?
编辑以回答评论问题
我一直在运行spark作业,作为一个更大的pycharm项目的一部分(因此spark上下文是围绕类包装的)。我使用下面的spark submit重构了代码,将其作为脚本运行:
spark-submit spark_consumer.py \