代码之家  ›  专栏  ›  技术社区  ›  jk1

对partitionBy创建的一个输出目录中的数据进行排序

  •  0
  • jk1  · 技术社区  · 3 年前

    我有一个很大的地理空间数据集分区,按quadkey的级别5。 在每个qk5级别的目录中,大约有1-50 Gb的数据,因此它不适合放在一个文件中。我想在进行地理空间查询时从下推过滤器中受益。所以我希望一个qk5分区中的文件按更高的qk分辨率排序(比如说四键级别10)。 问题:有没有办法在partitionBy batch中对数据进行排序? 例如

    qk5=00001/
        part1.parquet
        part2.parquet
        part3.parquet
        part4.parquet
    ...
    
    qk5=33333/
        part10000.parquet
        part20000.parquet
        part30000.parquet
        part40000.parquet
    

    我想让来自part1.parquet、part2.parquet,part3.parquet和part4.parquet的数据按“qk10”列排序。

    这是当前的代码,但它只提供在一个特定分区内的排序(例如part1.parquet):

    // Parquet save
    preExportRdd.toDF
      .repartition(partitionsNumber, $"salt")
      .sortWithinPartitions($"qk10")
      .drop("salt")
      .write
      .partitionBy("qk")
      .format("parquet")
      .option("compression", "gzip")
      .mode(SaveMode.Append)
      .save(exportUrl)
    
    0 回复  |  直到 3 年前
        1
  •  1
  •   Gabio    3 年前

    问题是,您没有按照以下方式对数据帧进行全局排序 qk 字段及其原因相同 qk 值分布在不同的火花分区中。 在写入阶段,由于 partitionBy("qk") ,写入特定物理分区(文件夹)的输出可能来自不同的spark分区,这会导致输出数据未排序。

    请尝试以下操作:

    preExportRdd.toDF
      .repartitionByRange(partitionsNumber, $"qk", $"qk10", $"salt")
      .sortWithinPartitions($"qk10")
      .drop("salt")
      .write
      .partitionBy("qk")
      .format("parquet")
      .option("compression", "gzip")
      .mode(SaveMode.Append)
      .save(exportUrl)
    

    这个 repartitionByRange 将根据提供的列对Dataframe进行排序,并将排序后的Dataframe拆分为所需数量的分区。