代码之家  ›  专栏  ›  技术社区  ›  bluesummers

通过Spark创建parquet Petastorm数据集失败,出现溢出错误(大于4GB)

  •  1
  • bluesummers  · 技术社区  · 6 年前

    我正在尝试实现Uber的Petastorm数据集创建,该数据集使用Spark创建一个拼花地板文件,该文件遵循他们的教程 Github page .

    守则:

    spark = SparkSession.builder.config('spark.driver.memory', '10g').master('local[4]').getOrCreate()
    sc = spark.sparkContext
    
    with materialize_dataset(spark=spark, dataset_url='file:///opt/data/hello_world_dataset',
                             schema=MySchema, row_group_size_mb=256):
    
        logging.info('Building RDD...')
        rows_rdd = sc.parallelize(ids)\
            .map(row_generator)\  # Generator that yields lists of examples
            .flatMap(lambda x: dict_to_spark_row(MySchema, x))
    
        logging.info('Creating DataFrame...')
        spark.createDataFrame(rows_rdd, MySchema.as_spark_schema()) \
            .coalesce(10) \
            .write \
            .mode('overwrite') \
            .parquet('file:///opt/data/hello_world_dataset')
    

    现在,RDD代码成功执行,但只有 .createDataFrame 调用时出现以下错误:

    这是我第一次使用Spark,所以我真的不知道这个错误是源于Spark还是Petastorm。

    通过查看此错误的其他解决方案(关于Spark,而不是Petastorm),我发现这可能与酸洗协议有关,但我无法确认这一点,我也没有找到改变酸洗协议的方法。

    我怎样才能避免这个错误呢?

    2 回复  |  直到 6 年前
        1
  •  2
  •   bluesummers    6 年前

    问题在于为在不同进程之间传递数据而进行的pickle,默认pickle协议是2,我们需要使用4来传递大于4GB的对象。

    要更改酸洗协议,请在创建Spark会话之前,使用以下代码

    from pyspark import broadcast
    import pickle
    
    
    def broadcast_dump(self, value, f):
        pickle.dump(value, f, 4)  # was 2, 4 is first protocol supporting >4GB
        f.close()
    
        return f.name
    
    
    broadcast.Broadcast.dump = broadcast_dump
    
        2
  •  2
  •   lockwobr    5 年前

    建立蓝夏人的答案

    The master branch of spark right now fixes

    from pyspark import broadcast
    from pyspark.cloudpickle import print_exec
    import pickle
    
    def broadcast_dump(self, value, f):
        try:
            pickle.dump(value, f, pickle.HIGHEST_PROTOCOL) 
        except pickle.PickleError:
            raise
        except Exception as e:
            msg = "Could not serialize broadcast: %s: %s" \
                    % (e.__class__.__name__, _exception_message(e))
            print_exec(sys.stderr)
            raise pickle.PicklingError(msg)
        f.close()
    
    broadcast.Broadcast.dump = broadcast_dump
    
    推荐文章