代码之家  ›  专栏  ›  技术社区  ›  Metadata

从JDBC源迁移数据时如何优化分区?

  •  6
  • Metadata  · 技术社区  · 6 年前

      val conf  = new SparkConf().setAppName("Spark-JDBC").set("spark.executor.heartbeatInterval","120s").set("spark.network.timeout","12000s").set("spark.sql.inMemoryColumnarStorage.compressed", "true").set("spark.sql.orc.filterPushdown","true").set("spark.serializer", "org.apache.spark.serializer.KryoSerializer").set("spark.kryoserializer.buffer.max","512m").set("spark.serializer", classOf[org.apache.spark.serializer.KryoSerializer].getName).set("spark.streaming.stopGracefullyOnShutdown","true").set("spark.yarn.driver.memoryOverhead","7168").set("spark.yarn.executor.memoryOverhead","7168").set("spark.sql.shuffle.partitions", "61").set("spark.default.parallelism", "60").set("spark.memory.storageFraction","0.5").set("spark.memory.fraction","0.6").set("spark.memory.offHeap.enabled","true").set("spark.memory.offHeap.size","16g").set("spark.dynamicAllocation.enabled", "false").set("spark.dynamicAllocation.enabled","true").set("spark.shuffle.service.enabled","true")
      val spark = SparkSession.builder().config(conf).master("yarn").enableHiveSupport().config("hive.exec.dynamic.partition", "true").config("hive.exec.dynamic.partition.mode", "nonstrict").getOrCreate()
      def prepareFinalDF(splitColumns:List[String], textList: ListBuffer[String], allColumns:String, dataMapper:Map[String, String], partition_columns:Array[String], spark:SparkSession): DataFrame = {
            val colList                = allColumns.split(",").toList
            val (partCols, npartCols)  = colList.partition(p => partition_columns.contains(p.takeWhile(x => x != ' ')))
            val queryCols              = npartCols.mkString(",") + ", 0 as " + flagCol + "," + partCols.reverse.mkString(",")
            val execQuery              = s"select ${allColumns}, 0 as ${flagCol} from schema.tablename where period_year='2017' and period_num='12'"
            val yearDF                 = spark.read.format("jdbc").option("url", connectionUrl).option("dbtable", s"(${execQuery}) as year2017")
                                                                          .option("user", devUserName).option("password", devPassword)
                                                                          .option("partitionColumn","cast_id")
                                                                          .option("lowerBound", 1).option("upperBound", 100000)
                                                                          .option("numPartitions",70).load()
            val totalCols:List[String] = splitColumns ++ textList
            val cdt                    = new ChangeDataTypes(totalCols, dataMapper)
            hiveDataTypes              = cdt.gpDetails()
            val fc                     = prepareHiveTableSchema(hiveDataTypes, partition_columns)
            val allColsOrdered         = yearDF.columns.diff(partition_columns) ++ partition_columns
            val allCols                = allColsOrdered.map(colname => org.apache.spark.sql.functions.col(colname))
            val resultDF               = yearDF.select(allCols:_*)
            val stringColumns          = resultDF.schema.fields.filter(x => x.dataType == StringType).map(s => s.name)
            val finalDF                = stringColumns.foldLeft(resultDF) {
              (tempDF, colName) => tempDF.withColumn(colName, regexp_replace(regexp_replace(col(colName), "[\r\n]+", " "), "[\t]+"," "))
            }
            finalDF
      }
        val dataDF = prepareFinalDF(splitColumns, textList, allColumns, dataMapper, partition_columns, spark)
        val dataDFPart = dataDF.repartition(30)
        dataDFPart.createOrReplaceTempView("preparedDF")
        spark.sql("set hive.exec.dynamic.partition.mode=nonstrict")
        spark.sql("set hive.exec.dynamic.partition=true")
        spark.sql(s"INSERT OVERWRITE TABLE schema.hivetable PARTITION(${prtn_String_columns}) select * from preparedDF")
    

    数据插入到配置单元表中,并根据 prtn_String_columns: source_system_name, period_year, period_num

    使用的Spark submit:

    SPARK_MAJOR_VERSION=2 spark-submit --conf spark.ui.port=4090 --driver-class-path /home/fdlhdpetl/jars/postgresql-42.1.4.jar  --jars /home/fdlhdpetl/jars/postgresql-42.1.4.jar --num-executors 80 --executor-cores 5 --executor-memory 50G --driver-memory 20G --driver-cores 3 --class com.partition.source.YearPartition splinter_2.11-0.1.jar --master=yarn --deploy-mode=cluster --keytab /home/fdlhdpetl/fdlhdpetl.keytab --principal fdlhdpetl@FDLDEV.COM --files /usr/hdp/current/spark2-client/conf/hive-site.xml,testconnection.properties --name Splinter --conf spark.executor.extraClassPath=/home/fdlhdpetl/jars/postgresql-42.1.4.jar
    

    在执行器日志中生成以下错误消息:

    Container exited with a non-zero exit code 143.
    Killed by external signal
    18/10/03 15:37:24 ERROR SparkUncaughtExceptionHandler: Uncaught exception in thread Thread[SIGTERM handler,9,system]
    java.lang.OutOfMemoryError: Java heap space
        at java.util.zip.InflaterInputStream.<init>(InflaterInputStream.java:88)
        at java.util.zip.ZipFile$ZipFileInflaterInputStream.<init>(ZipFile.java:393)
        at java.util.zip.ZipFile.getInputStream(ZipFile.java:374)
        at java.util.jar.JarFile.getManifestFromReference(JarFile.java:199)
        at java.util.jar.JarFile.getManifest(JarFile.java:180)
        at sun.misc.URLClassPath$JarLoader$2.getManifest(URLClassPath.java:944)
        at java.net.URLClassLoader.defineClass(URLClassLoader.java:450)
        at java.net.URLClassLoader.access$100(URLClassLoader.java:73)
        at java.net.URLClassLoader$1.run(URLClassLoader.java:368)
        at java.net.URLClassLoader$1.run(URLClassLoader.java:362)
        at java.security.AccessController.doPrivileged(Native Method)
        at java.net.URLClassLoader.findClass(URLClassLoader.java:361)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
        at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
        at org.apache.spark.util.SignalUtils$ActionHandler.handle(SignalUtils.scala:99)
        at sun.misc.Signal$1.run(Signal.java:212)
        at java.lang.Thread.run(Thread.java:745)
    

    我在日志中看到,在给定的分区数下,读取正在正确执行,如下所示:

    Scan JDBCRelation((select column_names from schema.tablename where period_year='2017' and period_num='12') as year2017) [numPartitions=50]
    

    enter image description here

    enter image description here

    enter image description here

    enter image description here

    数据分区不正确。一个分区变小,而另一个分区变大。这里有一个倾斜的问题。 将数据插入配置单元表时,作业在以下行失败: spark.sql(s"INSERT OVERWRITE TABLE schema.hivetable PARTITION(${prtn_String_columns}) select * from preparedDF")

    我尝试增加执行器的数量,增加执行器内存,驱动程序内存,尝试只保存为csv文件,而不是将数据帧保存到配置单元表中,但给出异常不会影响执行:

    java.lang.OutOfMemoryError: GC overhead limit exceeded
    

    3 回复  |  直到 6 年前