代码之家  ›  专栏  ›  技术社区  ›  Metadata

使用spark将大型数据集/数据加载到配置单元表中的最佳策略是什么?[复制品]

  •  0
  • Metadata  · 技术社区  · 6 年前

    我正在尝试将数据从postgresql表中的表移动到hdfs上的配置单元表。为此,我提出了以下代码:

      val conf  = new SparkConf().setAppName("Spark-JDBC").set("spark.executor.heartbeatInterval","120s").set("spark.network.timeout","12000s").set("spark.sql.inMemoryColumnarStorage.compressed", "true").set("spark.sql.orc.filterPushdown","true").set("spark.serializer", "org.apache.spark.serializer.KryoSerializer").set("spark.kryoserializer.buffer.max","512m").set("spark.serializer", classOf[org.apache.spark.serializer.KryoSerializer].getName).set("spark.streaming.stopGracefullyOnShutdown","true").set("spark.yarn.driver.memoryOverhead","7168").set("spark.yarn.executor.memoryOverhead","7168").set("spark.sql.shuffle.partitions", "61").set("spark.default.parallelism", "60").set("spark.memory.storageFraction","0.5").set("spark.memory.fraction","0.6").set("spark.memory.offHeap.enabled","true").set("spark.memory.offHeap.size","16g").set("spark.dynamicAllocation.enabled", "false").set("spark.dynamicAllocation.enabled","true").set("spark.shuffle.service.enabled","true")
      val spark = SparkSession.builder().config(conf).master("yarn").enableHiveSupport().config("hive.exec.dynamic.partition", "true").config("hive.exec.dynamic.partition.mode", "nonstrict").getOrCreate()
      def prepareFinalDF(splitColumns:List[String], textList: ListBuffer[String], allColumns:String, dataMapper:Map[String, String], partition_columns:Array[String], spark:SparkSession): DataFrame = {
            val colList                = allColumns.split(",").toList
            val (partCols, npartCols)  = colList.partition(p => partition_columns.contains(p.takeWhile(x => x != ' ')))
            val queryCols              = npartCols.mkString(",") + ", 0 as " + flagCol + "," + partCols.reverse.mkString(",")
            val execQuery              = s"select ${allColumns}, 0 as ${flagCol} from schema.tablename where period_year='2017' and period_num='12'"
            val yearDF                 = spark.read.format("jdbc").option("url", connectionUrl).option("dbtable", s"(${execQuery}) as year2017")
                                                                          .option("user", devUserName).option("password", devPassword)
                                                                          .option("partitionColumn","cast_id")
                                                                          .option("lowerBound", 1).option("upperBound", 100000)
                                                                          .option("numPartitions",70).load()
            val totalCols:List[String] = splitColumns ++ textList
            val cdt                    = new ChangeDataTypes(totalCols, dataMapper)
            hiveDataTypes              = cdt.gpDetails()
            val fc                     = prepareHiveTableSchema(hiveDataTypes, partition_columns)
            val allColsOrdered         = yearDF.columns.diff(partition_columns) ++ partition_columns
            val allCols                = allColsOrdered.map(colname => org.apache.spark.sql.functions.col(colname))
            val resultDF               = yearDF.select(allCols:_*)
            val stringColumns          = resultDF.schema.fields.filter(x => x.dataType == StringType).map(s => s.name)
            val finalDF                = stringColumns.foldLeft(resultDF) {
              (tempDF, colName) => tempDF.withColumn(colName, regexp_replace(regexp_replace(col(colName), "[\r\n]+", " "), "[\t]+"," "))
            }
            finalDF
      }
        val dataDF = prepareFinalDF(splitColumns, textList, allColumns, dataMapper, partition_columns, spark)
        val dataDFPart = dataDF.repartition(30)
        dataDFPart.createOrReplaceTempView("preparedDF")
        spark.sql("set hive.exec.dynamic.partition.mode=nonstrict")
        spark.sql("set hive.exec.dynamic.partition=true")
        spark.sql(s"INSERT OVERWRITE TABLE schema.hivetable PARTITION(${prtn_String_columns}) select * from preparedDF")
    

    数据被插入到基于 prtn_String_columns: source_system_name, period_year, period_num

    使用Spark提交:

    SPARK_MAJOR_VERSION=2 spark-submit --conf spark.ui.port=4090 --driver-class-path /home/fdlhdpetl/jars/postgresql-42.1.4.jar  --jars /home/fdlhdpetl/jars/postgresql-42.1.4.jar --num-executors 80 --executor-cores 5 --executor-memory 50G --driver-memory 20G --driver-cores 3 --class com.partition.source.YearPartition splinter_2.11-0.1.jar --master=yarn --deploy-mode=cluster --keytab /home/fdlhdpetl/fdlhdpetl.keytab --principal fdlhdpetl@FDLDEV.COM --files /usr/hdp/current/spark2-client/conf/hive-site.xml,testconnection.properties --name Splinter --conf spark.executor.extraClassPath=/home/fdlhdpetl/jars/postgresql-42.1.4.jar
    

    在执行器日志中生成以下错误消息:

    Container exited with a non-zero exit code 143.
    Killed by external signal
    18/10/03 15:37:24 ERROR SparkUncaughtExceptionHandler: Uncaught exception in thread Thread[SIGTERM handler,9,system]
    java.lang.OutOfMemoryError: Java heap space
        at java.util.zip.InflaterInputStream.<init>(InflaterInputStream.java:88)
        at java.util.zip.ZipFile$ZipFileInflaterInputStream.<init>(ZipFile.java:393)
        at java.util.zip.ZipFile.getInputStream(ZipFile.java:374)
        at java.util.jar.JarFile.getManifestFromReference(JarFile.java:199)
        at java.util.jar.JarFile.getManifest(JarFile.java:180)
        at sun.misc.URLClassPath$JarLoader$2.getManifest(URLClassPath.java:944)
        at java.net.URLClassLoader.defineClass(URLClassLoader.java:450)
        at java.net.URLClassLoader.access$100(URLClassLoader.java:73)
        at java.net.URLClassLoader$1.run(URLClassLoader.java:368)
        at java.net.URLClassLoader$1.run(URLClassLoader.java:362)
        at java.security.AccessController.doPrivileged(Native Method)
        at java.net.URLClassLoader.findClass(URLClassLoader.java:361)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
        at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
        at org.apache.spark.util.SignalUtils$ActionHandler.handle(SignalUtils.scala:99)
        at sun.misc.Signal$1.run(Signal.java:212)
        at java.lang.Thread.run(Thread.java:745)
    

    我在日志中看到,使用如下给定数量的分区正确执行读取:

    Scan JDBCRelation((select column_names from schema.tablename where period_year='2017' and period_num='12') as year2017) [numPartitions=50]
    

    以下是各阶段执行人的情况: enter image description here

    enter image description here

    enter image description here

    enter image description here

    数据分区不正确。一个分区变小,另一个分区变大。这里有一个歪斜的问题。 将数据插入配置单元表时,作业在以下行失败: spark.sql(s"INSERT OVERWRITE TABLE schema.hivetable PARTITION(${prtn_String_columns}) select * from preparedDF") 但我知道这是因为数据倾斜问题。

    我尝试增加执行器的数量,增加执行器内存、驱动程序内存,尝试仅保存为csv文件,而不是将数据帧保存到配置单元表中,但没有任何内容影响执行,因为出现异常:

    java.lang.OutOfMemoryError: GC overhead limit exceeded
    

    代码中有什么需要我更正的地方吗?有人能告诉我怎么解决这个问题吗?

    0 回复  |  直到 6 年前
        1
  •  6
  •   Community miroxlav    5 年前
    1. 根据输入数据量和集群资源,确定需要多少分区。根据经验,除非严格需要,否则最好将分区输入保持在1GB以下。严格小于块大小限制。

      你已经 previously stated 您迁移1TB的数据值,您在不同的帖子(5-70)中使用,这很可能是低,以确保顺利的过程。

      尝试使用不需要进一步的值 repartitioning

    2. 了解你的数据。

      分析数据集中可用的列,以确定是否有任何具有高基数和均匀分布的列要分布在所需数量的分区中。这些是导入过程的良好候选。此外,您还应该确定一个精确的值范围。

      具有不同中心度和偏度度量的聚集体以及直方图和按键的基本计数是很好的探索工具。对于这一部分,最好直接分析数据库中的数据,而不是将其提取到spark中。

      取决于您可以使用的rdbms width_bucket (postgresql,oracle)或类似的函数,以便在加载 partitionColumn , lowerBound , upperBound , numPartitons .

      s"""(SELECT width_bucket($partitionColum, $lowerBound, $upperBound, $numPartitons) AS bucket, COUNT(*)
      FROM t
      GROUP BY bucket) as tmp)"""
      
    3. 如果没有满足上述条件的列,请考虑:

      • 创建一个自定义的并通过公开它。一种观点。多个独立列上的散列通常是很好的候选者。请参考您的数据库手册以确定可以在此处使用的功能( DBMS_CRYPTO 在Oracle中, pgcrypto 在PostgreSQL中)*。
      • 使用一组独立的列,这些列组合在一起提供足够高的基数。

        (可选)如果要写入分区的配置单元表,则应考虑包含配置单元分区列。它可能会限制以后生成的文件数量。

    4. 准备分区参数

      • 如果在前面的步骤中选择或创建的列是数字的( or date / timestamp in Spark >= 2.4 )直接作为 分区列 并使用之前确定的范围值来填充 下限 上界

        如果绑定值不反映数据的属性( min(col) 对于 下限 , max(col) 对于 上界 )它可以导致一个重要的数据倾斜,所以线程小心。在最坏的情况下,当边界不包括数据范围时,所有记录都将由一台机器获取,这使得它不比完全没有分区要好多少。

      • 如果在前面的步骤中选择的列是分类的,或者是一组列,则生成 互斥的 完全覆盖数据的谓词,其格式可以在 SQL where条款。

        例如,如果您有一个列 A 值为{ a1 , a2 , a3 }和列 B 值为{ b1 , b2 , b3 }:

        val predicates = for {
          a <- Seq("a1", "a2", "a3")
          b <- Seq("b1", "b2", "b3")
        } yield s"A = $a AND B = $b"
        

        仔细检查条件是否重叠,是否覆盖了所有组合。如果不满足这些条件,则分别会出现重复或丢失的记录。

        传递数据作为 predicates 论证 jdbc 打电话。注意,分区的数量将完全等于谓词的数量。

    5. 将数据库置于只读模式(任何正在进行的写入操作都可能导致数据不一致。如果可能的话,应该在开始整个过程之前锁定数据库,但是如果不可能的话,在您的组织中)。

    6. 如果分区数与所需的输出加载数据匹配 repartition 并直接转储到接收器,如果不是,则可以尝试按照步骤1中的相同规则重新分区。

    7. 如果仍然遇到任何问题,请确保已正确配置Spark内存和GC选项。

    8. 如果上述任何一项都不起作用:

      • 考虑使用以下工具将数据转储到网络/分发存储 COPY TO 直接从那里读。

        注意,或者标准的数据库实用程序通常需要一个符合posix的文件系统,所以hdfs通常不会这样做。

        这种方法的优点是不需要担心列属性,也不需要将数据置于只读模式以确保一致性。

      • 使用专用的批量传输工具,如apache sqoop,然后重新调整数据。


    * 不要 使用伪列- Pseudocolumn in Spark JDBC .

        2
  •  1
  •   Elmar Macek    6 年前

    根据我的经验,有四种不同的内存设置:

    a)[1]用于存储数据以进行处理的内存vs[2]用于保存程序堆栈的堆空间

    b)[1]驱动程序vs[2]执行程序内存

    到目前为止,通过增加适当的内存,我总能成功地运行spark作业:

    因此,a2-b1将是驱动程序上用于保存程序堆栈的内存。等。

    属性名称如下:

    A1-B1) executor-memory

    A1-B2) driver-memory

    A2-B1) spark.yarn.executor.memoryOverhead

    A2-B2) spark.yarn.driver.memoryOverhead

    请记住,所有*-b1的总和必须小于工作机上的可用内存,而所有*-b2的总和必须小于驱动程序节点上的内存。

    我敢打赌,罪魁祸首是一个大胆标记堆设置。

        3
  •  0
  •   Karthick    6 年前

    你的另一个问题是重复的

     'How to avoid data skewing while reading huge datasets or tables into spark? 
      The data is not being partitioned properly. One partition is smaller while the 
      other one becomes huge on read.
      I observed that one of the partition has nearly 2million rows and 
      while inserting there is a skew in partition. '
    

    如果问题是处理读取后在数据帧中分区的数据,您是否尝试过增加“numpartitions”值?

    .option("numPartitions",50)
    

    lowerBound, upperBound 生成的where子句表达式和numpartitions决定拆分的数量。

    例如,sometable有列id(我们选择它作为 partitionColumn );我们在表中看到的列的值范围- ID 是从1到1000,我们想通过运行 select * from sometable , 所以我们使用lowerbound=1&upperbound=1000和numpartion=4

    这将生成一个4分区的数据框架,其中包含每个查询的结果,方法是基于我们的feed构建sql (lowerbound = 1 & upperbound = 1000 and numpartition = 4)

    select * from sometable where ID < 250
    select * from sometable where ID >= 250 and ID < 500
    select * from sometable where ID >= 500 and ID < 750
    select * from sometable where ID >= 750
    

    如果我们表中的大多数记录都在 ID(500,750) . 这就是你现在的处境。

    当我们增加numpartition时,分裂会进一步发生,这会减少同一分区中的记录量,但是 不是个好机会。

    而不是星星之火 partitioncolumn 基于我们提供的边界,如果您考虑自己提供分割,那么数据可以是均匀的 分裂。您需要切换到另一个jdbc方法,而不是 (lowerbound,upperbound & numpartition) 我们可以提供 直接断言。

    def jdbc(url: String, table: String, predicates: Array[String], connectionProperties: Properties): DataFrame 
    

    Link