首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >如何优化从JDBC源迁移数据时的分区?

如何优化从JDBC源迁移数据时的分区?
EN

Stack Overflow用户
提问于 2018-10-02 06:38:23
回答 3查看 12K关注 0票数 14

我正在尝试将数据从PostgreSQL表中的表移动到HDFS上的Hive表。为此,我提出了以下代码:

代码语言:javascript
复制
  val conf  = new SparkConf().setAppName("Spark-JDBC").set("spark.executor.heartbeatInterval","120s").set("spark.network.timeout","12000s").set("spark.sql.inMemoryColumnarStorage.compressed", "true").set("spark.sql.orc.filterPushdown","true").set("spark.serializer", "org.apache.spark.serializer.KryoSerializer").set("spark.kryoserializer.buffer.max","512m").set("spark.serializer", classOf[org.apache.spark.serializer.KryoSerializer].getName).set("spark.streaming.stopGracefullyOnShutdown","true").set("spark.yarn.driver.memoryOverhead","7168").set("spark.yarn.executor.memoryOverhead","7168").set("spark.sql.shuffle.partitions", "61").set("spark.default.parallelism", "60").set("spark.memory.storageFraction","0.5").set("spark.memory.fraction","0.6").set("spark.memory.offHeap.enabled","true").set("spark.memory.offHeap.size","16g").set("spark.dynamicAllocation.enabled", "false").set("spark.dynamicAllocation.enabled","true").set("spark.shuffle.service.enabled","true")
  val spark = SparkSession.builder().config(conf).master("yarn").enableHiveSupport().config("hive.exec.dynamic.partition", "true").config("hive.exec.dynamic.partition.mode", "nonstrict").getOrCreate()
  def prepareFinalDF(splitColumns:List[String], textList: ListBuffer[String], allColumns:String, dataMapper:Map[String, String], partition_columns:Array[String], spark:SparkSession): DataFrame = {
        val colList                = allColumns.split(",").toList
        val (partCols, npartCols)  = colList.partition(p => partition_columns.contains(p.takeWhile(x => x != ' ')))
        val queryCols              = npartCols.mkString(",") + ", 0 as " + flagCol + "," + partCols.reverse.mkString(",")
        val execQuery              = s"select ${allColumns}, 0 as ${flagCol} from schema.tablename where period_year='2017' and period_num='12'"
        val yearDF                 = spark.read.format("jdbc").option("url", connectionUrl).option("dbtable", s"(${execQuery}) as year2017")
                                                                      .option("user", devUserName).option("password", devPassword)
                                                                      .option("partitionColumn","cast_id")
                                                                      .option("lowerBound", 1).option("upperBound", 100000)
                                                                      .option("numPartitions",70).load()
        val totalCols:List[String] = splitColumns ++ textList
        val cdt                    = new ChangeDataTypes(totalCols, dataMapper)
        hiveDataTypes              = cdt.gpDetails()
        val fc                     = prepareHiveTableSchema(hiveDataTypes, partition_columns)
        val allColsOrdered         = yearDF.columns.diff(partition_columns) ++ partition_columns
        val allCols                = allColsOrdered.map(colname => org.apache.spark.sql.functions.col(colname))
        val resultDF               = yearDF.select(allCols:_*)
        val stringColumns          = resultDF.schema.fields.filter(x => x.dataType == StringType).map(s => s.name)
        val finalDF                = stringColumns.foldLeft(resultDF) {
          (tempDF, colName) => tempDF.withColumn(colName, regexp_replace(regexp_replace(col(colName), "[\r\n]+", " "), "[\t]+"," "))
        }
        finalDF
  }
    val dataDF = prepareFinalDF(splitColumns, textList, allColumns, dataMapper, partition_columns, spark)
    val dataDFPart = dataDF.repartition(30)
    dataDFPart.createOrReplaceTempView("preparedDF")
    spark.sql("set hive.exec.dynamic.partition.mode=nonstrict")
    spark.sql("set hive.exec.dynamic.partition=true")
    spark.sql(s"INSERT OVERWRITE TABLE schema.hivetable PARTITION(${prtn_String_columns}) select * from preparedDF")

将数据插入到基于prtn_String_columns: source_system_name, period_year, period_num的动态分区的单元表中。

火花-提交所用:

代码语言:javascript
复制
SPARK_MAJOR_VERSION=2 spark-submit --conf spark.ui.port=4090 --driver-class-path /home/fdlhdpetl/jars/postgresql-42.1.4.jar  --jars /home/fdlhdpetl/jars/postgresql-42.1.4.jar --num-executors 80 --executor-cores 5 --executor-memory 50G --driver-memory 20G --driver-cores 3 --class com.partition.source.YearPartition splinter_2.11-0.1.jar --master=yarn --deploy-mode=cluster --keytab /home/fdlhdpetl/fdlhdpetl.keytab --principal fdlhdpetl@FDLDEV.COM --files /usr/hdp/current/spark2-client/conf/hive-site.xml,testconnection.properties --name Splinter --conf spark.executor.extraClassPath=/home/fdlhdpetl/jars/postgresql-42.1.4.jar

在executor日志中生成以下错误消息:

代码语言:javascript
复制
Container exited with a non-zero exit code 143.
Killed by external signal
18/10/03 15:37:24 ERROR SparkUncaughtExceptionHandler: Uncaught exception in thread Thread[SIGTERM handler,9,system]
java.lang.OutOfMemoryError: Java heap space
    at java.util.zip.InflaterInputStream.<init>(InflaterInputStream.java:88)
    at java.util.zip.ZipFile$ZipFileInflaterInputStream.<init>(ZipFile.java:393)
    at java.util.zip.ZipFile.getInputStream(ZipFile.java:374)
    at java.util.jar.JarFile.getManifestFromReference(JarFile.java:199)
    at java.util.jar.JarFile.getManifest(JarFile.java:180)
    at sun.misc.URLClassPath$JarLoader$2.getManifest(URLClassPath.java:944)
    at java.net.URLClassLoader.defineClass(URLClassLoader.java:450)
    at java.net.URLClassLoader.access$100(URLClassLoader.java:73)
    at java.net.URLClassLoader$1.run(URLClassLoader.java:368)
    at java.net.URLClassLoader$1.run(URLClassLoader.java:362)
    at java.security.AccessController.doPrivileged(Native Method)
    at java.net.URLClassLoader.findClass(URLClassLoader.java:361)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
    at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
    at org.apache.spark.util.SignalUtils$ActionHandler.handle(SignalUtils.scala:99)
    at sun.misc.Signal$1.run(Signal.java:212)
    at java.lang.Thread.run(Thread.java:745)

我在日志中看到,在给定的分区数下,正在正确地执行读,如下所示:

代码语言:javascript
复制
Scan JDBCRelation((select column_names from schema.tablename where period_year='2017' and period_num='12') as year2017) [numPartitions=50]

以下是执行者分阶段的状况:

数据没有被正确地划分。一个分区更小,而另一个分区变得很大。这里有一个偏斜的问题。当将数据插入Hive表时,作业在行:spark.sql(s"INSERT OVERWRITE TABLE schema.hivetable PARTITION(${prtn_String_columns}) select * from preparedDF")处失败,但我理解这是由于数据倾斜问题而发生的。

我试图增加执行程序的数量,增加执行程序内存,驱动程序内存,试图只将数据存储为csv文件,而不是将数据存储到Hive表中,但不影响异常的执行:

代码语言:javascript
复制
java.lang.OutOfMemoryError: GC overhead limit exceeded

代码中有什么我需要修正的吗?有人能告诉我怎么解决这个问题吗?

EN

回答 3

Stack Overflow用户

回答已采纳

发布于 2018-10-06 10:20:50

  1. 确定需要多少分区,给定输入数据量和群集资源。根据经验,除非严格需要,否则最好将分区输入保持在1GB以下。而且严格地小于块的大小限制。 您已经使用先前所述迁移了在不同帖子(5-70)中使用的1TB数据值,这很可能会降低数据的处理速度。 尝试使用不需要进一步repartitioning的值。
  2. 了解你的数据。 分析数据集中可用的列,以确定是否有任何具有高基数和均匀分布的列分布在所需的分区数之间。这些都是一个重要过程的好候选者。此外,您还应该确定一个确切的值范围。 具有不同中心度和偏斜度度量的集合以及直方图和基本按键计数是很好的勘探工具。对于这一部分,最好是直接分析数据库中的数据,而不是将其获取到Spark。 根据关系数据库管理系统的不同,您可以使用width_bucket (PostgreSQL,Oracle)或类似的函数来了解在加载partitionColumnlowerBoundupperBoundnumPartitons之后如何在Spark中分发数据。 S“”(选择width_bucket($partitionColum,$lowerBound,$upperBound,$numPartitons)作为桶,计数(*)从t组逐桶)作为tmp)。
  3. 如果没有符合上述标准的列,请考虑:
代码语言:javascript
复制
- Creating a custom one and exposing it via. a view. Hashes over multiple independent columns are usually good candidates. Please consult your database manual to determine functions that can be used here (`DBMS_CRYPTO` in Oracle, `pgcrypto` in PostgreSQL)\*.
- Using a set of independent columns which taken together provide high enough cardinality.

可以选择,如果要写入分区的Hive表,则应该考虑包括Hive分区列。它可能会限制以后生成的文件的数量。

  1. 准备分区参数
代码语言:javascript
复制
- If column selected or created in the previous steps is numeric ([or date / timestamp in Spark >= 2.4](https://issues.apache.org/jira/browse/SPARK-22814)) provide it directly as the `partitionColumn` and use range values determined before to fill `lowerBound` and `upperBound`.

如果绑定值没有反映数据的属性(min(col)表示lowerBoundmax(col)表示upperBound),那么它会导致非常小心的数据倾斜。在最坏的情况下,当界限不包括数据的范围时,所有记录都将由一台机器来获取,这并不比没有分区更好。

-如果前面步骤中选择的列是绝对的,或者是一组列,则生成一个完全覆盖数据的互斥谓词列表,其形式可以在SQL where子句中使用。

例如,如果列A具有值{a1a2a3},而列B具有值{b1b2b3}:

val谓词= for {a <- Seq("a1“、"a2”、"a3") b <- Seq("b1“、"b2”、"b3") }产生s"A = $a和B= $b“

二次检查条件不重叠,所有组合都包括在内。如果不满足这些条件,您将分别得到重复或丢失的记录。

将数据作为predicates参数传递给jdbc调用。注意,分区的数量将与谓词的数量完全相等。

  1. 将数据库置于只读模式(任何正在进行的写入都会导致数据不一致。如果可能的话,您应该在启动整个过程之前锁定数据库,但如果不可能,则在您的组织中锁定)。
  2. 如果分区的数量与所需的不带repartition的输出加载数据相匹配,并直接转储到接收器,则可以尝试按照步骤1中的相同规则重新分区。
  3. 如果您仍然遇到任何问题,请确保您已经正确配置了Spark和GC选项。
  4. 如果上述任何一项都不起作用:
代码语言:javascript
复制
- Consider dumping your data to a network / distributes storage using tools like `COPY TO` and read it directly from there.

注意,或者标准数据库实用程序,您通常需要一个符合POSIX的文件系统,所以HDFS通常不会这样做。

这种方法的优点是您不需要担心列属性,也不需要将数据放在只读模式中,以确保一致性。

-使用专用的大容量传输工具(如Apache ),然后对数据进行整形。

*不要使用假古兰经- Spark中的伪

票数 20
EN

Stack Overflow用户

发布于 2018-10-08 13:31:18

根据我的经验,有四种不同的内存设置:

( A)由于处理原因而存储数据的1内存,用于保存程序堆栈的堆空间(2)

( B) 1个驱动程序对2个执行器内存

到目前为止,我总是能够通过增加适当类型的内存来成功地运行我的星火作业:

因此,A2-B1将是驱动程序上可用的内存,用于保存程序堆栈。等。

财产名称如下:

A1-B1) executor-memory

A1-B2) driver-memory

A2-B1) spark.yarn.executor.memoryOverhead

A2-B2) spark.yarn.driver.memoryOverhead

请记住,all *-B1的总和必须小于工作人员上的可用内存,而all *-B2的总和必须小于驱动器节点上的内存。

我打赌,罪魁祸首是其中一个大胆标记的堆设置。

票数 1
EN

Stack Overflow用户

发布于 2018-10-07 08:11:27

你的另一个问题在这里是重复的。

代码语言:javascript
复制
 'How to avoid data skewing while reading huge datasets or tables into spark? 
  The data is not being partitioned properly. One partition is smaller while the 
  other one becomes huge on read.
  I observed that one of the partition has nearly 2million rows and 
  while inserting there is a skew in partition. '

如果问题是在读取后处理数据中分区的数据,那么您是否尝试过增加"numPartitions“值?

代码语言:javascript
复制
.option("numPartitions",50)

lowerBound, upperBound窗体分区为生成的WHERE子句表达式和the分区跨步确定拆分的数量。

例如,sometable有列- ID (我们选择它为partitionColumn);我们在表中看到的列-ID的值范围是从1到1000,我们希望通过运行select * from sometable获得所有记录,所以我们使用了小写值=1&上界= 1000和num分区=4。

通过构建基于提要(lowerbound = 1 & upperbound = 1000 and numpartition = 4)的sql,这将生成一个由4个分区组成的数据格式,其中包含每个查询的结果。

代码语言:javascript
复制
select * from sometable where ID < 250
select * from sometable where ID >= 250 and ID < 500
select * from sometable where ID >= 500 and ID < 750
select * from sometable where ID >= 750

如果我们表中的大多数记录都在ID(500,750)的范围内呢?这就是你所处的处境。

当我们增加数字分区时,分割会进一步发生,从而减少同一分区中的记录量,但这不是一个好机会。

与我们提供的基于边界的火花分裂partitioncolumn不同的是,如果您考虑自己来填充拆分,那么数据可以被均匀地分割。您需要切换到另一个JDBC方法,在该方法中,我们可以直接提供谓词而不是(lowerbound,upperbound & numpartition)

代码语言:javascript
复制
def jdbc(url: String, table: String, predicates: Array[String], connectionProperties: Properties): DataFrame 

链接

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/52603131

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档