代码之家  ›  专栏  ›  技术社区  ›  Yevgeni Litvin

在spark中创建行组大小小于100的拼花地板文件

  •  11
  • Yevgeni Litvin  · 技术社区  · 7 年前

    我有一个有少量字段的spark数据框。一些字段是巨大的二进制Blob。整行的大小约为50 MB。

    我正在将数据帧保存为拼花格式。我正在使用控制行组的大小 parquet.block.size 参数

    Spark将生成一个拼花地板文件,但我总是在一个行组中获得至少100行。这对我来说是一个问题,因为块大小可能会变成千兆字节,这在我的应用程序中无法正常工作。

    拼花地板块大小 只要大小足够容纳100多行,就可以正常工作。

    我修改了 InternalParquetRecordWriter.java 成为 MINIMUM_RECORD_COUNT_FOR_CHECK = 2 ,修复了该问题,但是,我找不到支持调整此硬编码常量的配置值。

    是否有其他/更好的方法可以获得小于100的行组大小?

    这是我的代码片段:

    from pyspark import Row
    from pyspark.sql import SparkSession
    import numpy as np
    
    from pyspark.sql.types import StructType, StructField, BinaryType
    
    
    def fake_row(x):
        result = bytearray(np.random.randint(0, 127, (3 * 1024 * 1024 / 2), dtype=np.uint8).tobytes())
        return Row(result, result)
    
    spark_session = SparkSession \
        .builder \
        .appName("bbox2d_dataset_extraction") \
        .config("spark.driver.memory", "12g") \
        .config("spark.executor.memory", "4g")
    
    spark_session.master('local[5]')
    
    spark = spark_session.getOrCreate()
    sc = spark.sparkContext
    sc._jsc.hadoopConfiguration().setInt("parquet.block.size", 8 * 1024 * 1024)
    
    index = sc.parallelize(range(50), 5)
    huge_rows = index.map(fake_row)
    schema = StructType([StructField('f1', BinaryType(), False), StructField('f2', BinaryType(), False)])
    
    bbox2d_dataframe = spark.createDataFrame(huge_rows, schema).coalesce(1)
    bbox2d_dataframe. \
        write.option("compression", "none"). \
        mode('overwrite'). \
        parquet('/tmp/huge/')
    
    2 回复  |  直到 7 年前
        1
  •  3
  •   Pradeep Gollakota    7 年前

    不幸的是,我还没有找到这样做的方法。我报告了 this issue 删除硬编码值并使其可配置。如果你感兴趣的话,我有一个补丁。

        2
  •  2
  •   Tagar    4 年前

    虽然 PARQUET-409 尚未修复,有几个变通方法可以让应用程序使用它 100 硬编码的每个行组的最小记录数。

    第一个问题和解决方法 : 您提到了行的大小可能高达50Mb。 这使得行组大小约为5Gb。 同时,您的spark执行器只有4Gb( spark.executor.memory ). 使其显著大于最大行组大小。
    我建议使用12-20Gb的大spark executor内存 火花执行人。记忆力 . 玩这个游戏,看看哪一个适用于您的数据集。 我们的大多数生产作业都使用此范围内的spark executor内存运行。 为了让这项功能适用于如此大的行组,您可能还需要调低音量 spark.executor.cores 到1,以确保每个执行器进程一次只接受一个这样大的行组。(以牺牲一些火花效率为代价)或许可以试试 火花执行人。核心 设置为2-这可能需要增加 火花执行人。记忆力 至20-31Gb范围。(尽量留下来 under 32Gb 当jvm切换到非压缩OOP时,内存开销可能高达50%)

    第二个问题和解决方法 :5Gb的大行块很可能分布在许多HDFS块上,因为默认HDFS块在128-256Mb范围内。(我假设您使用HDFS存储这些拼花文件,就像使用“hadoop”标记一样)parquet best practice 用于将行组完全驻留在一个HDFS块中:

    行组大小:较大的行组允许较大的列块 可以进行更大的顺序IO。更大的群体也 写入路径中需要更多缓冲(或两次写入)。我们 建议使用大型行组(512MB-1GB)。因为整个行组 可能需要读取,我们希望它完全适合一个HDFS块。 因此,HDFS块大小也应设置为更大。一 优化的读取设置为:1GB行组、1GB HDFS块大小、1 每个HDFS文件的HDFS块。

    下面是如何更改HDFS块大小的示例(在您之前设置 创造 此类拼花文件):

    sc._jsc.hadoopConfiguration().set("dfs.block.size", "5g")
    

    或在Spark Scala中:

    sc.hadoopConfiguration.set("dfs.block.size", "5g")
    

    我希望这将是固定在拼花地板水平有时,但这两个解决办法应该允许您与拼花地板操作这样大的行组。