代码之家  ›  专栏  ›  技术社区  ›  Yu Chen little_birdie

执行多个数据帧联接时出现Pyspark OutOfMemoryErrors

  •  2
  • Yu Chen little_birdie  · 技术社区  · 6 年前

    关于这个问题有很多帖子,但是没有人回答我的问题。

    我遇到了 OutOfMemoryError 当试图将许多不同的数据帧连接在一起时。

    我的本地机器有16GB的内存,我已经设置了spark配置:

    class SparkRawConsumer:
    
        def __init__(self, filename, reference_date, FILM_DATA):
            self.sparkContext = SparkContext(master='local[*]', appName='my_app')
            SparkContext.setSystemProperty('spark.executor.memory', '3g')
            SparkContext.setSystemProperty('spark.driver.memory', '15g')
    

    很明显,有很多关于spark中oom错误的帖子,但基本上大多数都是为了增加内存属性。

    我基本上是从50-60个较小的数据帧执行连接,这些数据帧有两列 uid data_in_the_form_of_lists (通常是python字符串的列表)。我要加入的主数据帧有大约10列,但也包含 液体 专栏(我正在加入)。

    我只想加入1500行数据。然而,我会经常遇到内存不足的错误,因为很明显所有这些数据都可以放入内存。我在我的仓库里看了看斯巴奎,证实了这一点:

    Spark UI screenshot

    在代码中,我的连接如下所示:

    # lots of computations to read in my dataframe and produce metric1, metric2, metric3, .... metric 50
    metrics_df = metrics_df.join(
                    self.sqlContext.createDataFrame(metric1, schema=["uid", "metric1"]), on="uid")
    
    metrics_df.count()
    metrics_df.repartition("gid_value")
    metrics_df = metrics_df.join(
                    self.sqlContext.createDataFrame(metric2, schema=["uid", "metric2"]),
                    on="gid_value")
    
    metrics_df.repartition("gid_value")
    metrics_df = metrics_df.join(
                    self.sqlContext.createDataFrame(metric3, schema=["uid", "metric3"]),
                    on="uid")
    
    metrics_df.count()
    metrics_df.repartition("gid_value")
    

    在哪里? metric1 我是说, metric2 metric3 在连接之前,我是否将RDD转换为数据帧(请记住,实际上有50个更小的 metric 我要加入的dfs)。

    我打电话给 metric.count() 强制求值,因为它似乎有助于防止内存错误(否则,当尝试最终收集时,我将得到更多的驱动程序错误)。

    这些误差是不确定的。我看不到它们在我的连接中的任何特定位置持续发生,有时似乎正在发生我的最后一次连接 metrics_df.collect() 呼叫,有时在较小的连接期间。

    我真的怀疑任务序列化/反序列化存在一些问题。例如,当我查看一个典型阶段的事件时间线时,我发现大部分时间都被任务反序列化占用:

    Spark UI screenshot serialization

    我还注意到,垃圾收集时间有很多:

    Spark UI screenshot garbage collection

    垃圾回收是导致内存错误的问题吗?还是任务序列化?

    编辑以回答评论问题

    我一直在运行spark作业,作为一个更大的pycharm项目的一部分(因此spark上下文是围绕类包装的)。我使用下面的spark submit重构了代码,将其作为脚本运行:

    spark-submit spark_consumer.py \
      --driver-memory=10G \
      --executor-memory=5G \
      --conf spark.executor.extraJavaOptions='-XX:+UseParallelGC -XX:+PrintGCDetails -XX:+PrintGCTimeStamps'
    
    1 回复  |  直到 6 年前
        1
  •  1
  •   hamza tuna    6 年前

    我也遇到过类似的问题:
    Spark提交:

    spark-submit --driver-memory 3g\
                --executor-memory 14g\
                *.py
    

    代码:

    sc = SparkContext().getOrCreate()