我正在尝试将数据从PostgreSQL表中的表移动到HDFS上的Hive表。为此,我提出了以下代码:
val conf = new SparkConf().setAppName("Spark-JDBC").set("spark.executor.heartbeatInterval","120s").set("spark.network.timeout","12000s").set("spark.sql.inMemoryColumnarStorage.compressed", "true").set("spark.sql.orc.filterPushdown","true").set("spark.serializer", "org.apache.spark.serializer.KryoSerializer").set("spark.kryoserializer.buffer.max","512m").set("spark.serializer", classOf[org.apache.spark.serializer.KryoSerializer].getName).set("spark.streaming.stopGracefullyOnShutdown","true").set("spark.yarn.driver.memoryOverhead","7168").set("spark.yarn.executor.memoryOverhead","7168").set("spark.sql.shuffle.partitions", "61").set("spark.default.parallelism", "60").set("spark.memory.storageFraction","0.5").set("spark.memory.fraction","0.6").set("spark.memory.offHeap.enabled","true").set("spark.memory.offHeap.size","16g").set("spark.dynamicAllocation.enabled", "false").set("spark.dynamicAllocation.enabled","true").set("spark.shuffle.service.enabled","true")
val spark = SparkSession.builder().config(conf).master("yarn").enableHiveSupport().config("hive.exec.dynamic.partition", "true").config("hive.exec.dynamic.partition.mode", "nonstrict").getOrCreate()
def prepareFinalDF(splitColumns:List[String], textList: ListBuffer[String], allColumns:String, dataMapper:Map[String, String], partition_columns:Array[String], spark:SparkSession): DataFrame = {
val colList = allColumns.split(",").toList
val (partCols, npartCols) = colList.partition(p => partition_columns.contains(p.takeWhile(x => x != ' ')))
val queryCols = npartCols.mkString(",") + ", 0 as " + flagCol + "," + partCols.reverse.mkString(",")
val execQuery = s"select ${allColumns}, 0 as ${flagCol} from schema.tablename where period_year='2017' and period_num='12'"
val yearDF = spark.read.format("jdbc").option("url", connectionUrl).option("dbtable", s"(${execQuery}) as year2017")
.option("user", devUserName).option("password", devPassword)
.option("partitionColumn","cast_id")
.option("lowerBound", 1).option("upperBound", 100000)
.option("numPartitions",70).load()
val totalCols:List[String] = splitColumns ++ textList
val cdt = new ChangeDataTypes(totalCols, dataMapper)
hiveDataTypes = cdt.gpDetails()
val fc = prepareHiveTableSchema(hiveDataTypes, partition_columns)
val allColsOrdered = yearDF.columns.diff(partition_columns) ++ partition_columns
val allCols = allColsOrdered.map(colname => org.apache.spark.sql.functions.col(colname))
val resultDF = yearDF.select(allCols:_*)
val stringColumns = resultDF.schema.fields.filter(x => x.dataType == StringType).map(s => s.name)
val finalDF = stringColumns.foldLeft(resultDF) {
(tempDF, colName) => tempDF.withColumn(colName, regexp_replace(regexp_replace(col(colName), "[\r\n]+", " "), "[\t]+"," "))
}
finalDF
}
val dataDF = prepareFinalDF(splitColumns, textList, allColumns, dataMapper, partition_columns, spark)
val dataDFPart = dataDF.repartition(30)
dataDFPart.createOrReplaceTempView("preparedDF")
spark.sql("set hive.exec.dynamic.partition.mode=nonstrict")
spark.sql("set hive.exec.dynamic.partition=true")
spark.sql(s"INSERT OVERWRITE TABLE schema.hivetable PARTITION(${prtn_String_columns}) select * from preparedDF")将数据插入到基于prtn_String_columns: source_system_name, period_year, period_num的动态分区的单元表中。
火花-提交所用:
SPARK_MAJOR_VERSION=2 spark-submit --conf spark.ui.port=4090 --driver-class-path /home/fdlhdpetl/jars/postgresql-42.1.4.jar --jars /home/fdlhdpetl/jars/postgresql-42.1.4.jar --num-executors 80 --executor-cores 5 --executor-memory 50G --driver-memory 20G --driver-cores 3 --class com.partition.source.YearPartition splinter_2.11-0.1.jar --master=yarn --deploy-mode=cluster --keytab /home/fdlhdpetl/fdlhdpetl.keytab --principal fdlhdpetl@FDLDEV.COM --files /usr/hdp/current/spark2-client/conf/hive-site.xml,testconnection.properties --name Splinter --conf spark.executor.extraClassPath=/home/fdlhdpetl/jars/postgresql-42.1.4.jar在executor日志中生成以下错误消息:
Container exited with a non-zero exit code 143.
Killed by external signal
18/10/03 15:37:24 ERROR SparkUncaughtExceptionHandler: Uncaught exception in thread Thread[SIGTERM handler,9,system]
java.lang.OutOfMemoryError: Java heap space
at java.util.zip.InflaterInputStream.<init>(InflaterInputStream.java:88)
at java.util.zip.ZipFile$ZipFileInflaterInputStream.<init>(ZipFile.java:393)
at java.util.zip.ZipFile.getInputStream(ZipFile.java:374)
at java.util.jar.JarFile.getManifestFromReference(JarFile.java:199)
at java.util.jar.JarFile.getManifest(JarFile.java:180)
at sun.misc.URLClassPath$JarLoader$2.getManifest(URLClassPath.java:944)
at java.net.URLClassLoader.defineClass(URLClassLoader.java:450)
at java.net.URLClassLoader.access$100(URLClassLoader.java:73)
at java.net.URLClassLoader$1.run(URLClassLoader.java:368)
at java.net.URLClassLoader$1.run(URLClassLoader.java:362)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:361)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
at org.apache.spark.util.SignalUtils$ActionHandler.handle(SignalUtils.scala:99)
at sun.misc.Signal$1.run(Signal.java:212)
at java.lang.Thread.run(Thread.java:745)我在日志中看到,在给定的分区数下,正在正确地执行读,如下所示:
Scan JDBCRelation((select column_names from schema.tablename where period_year='2017' and period_num='12') as year2017) [numPartitions=50]以下是执行者分阶段的状况:




数据没有被正确地划分。一个分区更小,而另一个分区变得很大。这里有一个偏斜的问题。当将数据插入Hive表时,作业在行:spark.sql(s"INSERT OVERWRITE TABLE schema.hivetable PARTITION(${prtn_String_columns}) select * from preparedDF")处失败,但我理解这是由于数据倾斜问题而发生的。
我试图增加执行程序的数量,增加执行程序内存,驱动程序内存,试图只将数据存储为csv文件,而不是将数据存储到Hive表中,但不影响异常的执行:
java.lang.OutOfMemoryError: GC overhead limit exceeded代码中有什么我需要修正的吗?有人能告诉我怎么解决这个问题吗?
发布于 2018-10-06 10:20:50
repartitioning的值。width_bucket (PostgreSQL,Oracle)或类似的函数来了解在加载partitionColumn、lowerBound、upperBound、numPartitons之后如何在Spark中分发数据。
S“”(选择width_bucket($partitionColum,$lowerBound,$upperBound,$numPartitons)作为桶,计数(*)从t组逐桶)作为tmp)。- Creating a custom one and exposing it via. a view. Hashes over multiple independent columns are usually good candidates. Please consult your database manual to determine functions that can be used here (`DBMS_CRYPTO` in Oracle, `pgcrypto` in PostgreSQL)\*.
- Using a set of independent columns which taken together provide high enough cardinality.可以选择,如果要写入分区的Hive表,则应该考虑包括Hive分区列。它可能会限制以后生成的文件的数量。
- If column selected or created in the previous steps is numeric ([or date / timestamp in Spark >= 2.4](https://issues.apache.org/jira/browse/SPARK-22814)) provide it directly as the `partitionColumn` and use range values determined before to fill `lowerBound` and `upperBound`.如果绑定值没有反映数据的属性(min(col)表示lowerBound,max(col)表示upperBound),那么它会导致非常小心的数据倾斜。在最坏的情况下,当界限不包括数据的范围时,所有记录都将由一台机器来获取,这并不比没有分区更好。
-如果前面步骤中选择的列是绝对的,或者是一组列,则生成一个完全覆盖数据的互斥谓词列表,其形式可以在SQL where子句中使用。
例如,如果列A具有值{a1、a2、a3},而列B具有值{b1、b2、b3}:
val谓词= for {a <- Seq("a1“、"a2”、"a3") b <- Seq("b1“、"b2”、"b3") }产生s"A = $a和B= $b“
二次检查条件不重叠,所有组合都包括在内。如果不满足这些条件,您将分别得到重复或丢失的记录。
将数据作为predicates参数传递给jdbc调用。注意,分区的数量将与谓词的数量完全相等。
repartition的输出加载数据相匹配,并直接转储到接收器,则可以尝试按照步骤1中的相同规则重新分区。- Consider dumping your data to a network / distributes storage using tools like `COPY TO` and read it directly from there.注意,或者标准数据库实用程序,您通常需要一个符合POSIX的文件系统,所以HDFS通常不会这样做。
这种方法的优点是您不需要担心列属性,也不需要将数据放在只读模式中,以确保一致性。
-使用专用的大容量传输工具(如Apache ),然后对数据进行整形。
*不要使用假古兰经- Spark中的伪。
发布于 2018-10-08 13:31:18
根据我的经验,有四种不同的内存设置:
( A)由于处理原因而存储数据的1内存,用于保存程序堆栈的堆空间(2)
( B) 1个驱动程序对2个执行器内存
到目前为止,我总是能够通过增加适当类型的内存来成功地运行我的星火作业:
因此,A2-B1将是驱动程序上可用的内存,用于保存程序堆栈。等。
财产名称如下:
A1-B1) executor-memory
A1-B2) driver-memory
A2-B1) spark.yarn.executor.memoryOverhead
A2-B2) spark.yarn.driver.memoryOverhead
请记住,all *-B1的总和必须小于工作人员上的可用内存,而all *-B2的总和必须小于驱动器节点上的内存。
我打赌,罪魁祸首是其中一个大胆标记的堆设置。
发布于 2018-10-07 08:11:27
你的另一个问题在这里是重复的。
'How to avoid data skewing while reading huge datasets or tables into spark?
The data is not being partitioned properly. One partition is smaller while the
other one becomes huge on read.
I observed that one of the partition has nearly 2million rows and
while inserting there is a skew in partition. '如果问题是在读取后处理数据中分区的数据,那么您是否尝试过增加"numPartitions“值?
.option("numPartitions",50)lowerBound, upperBound窗体分区为生成的WHERE子句表达式和the分区跨步确定拆分的数量。
例如,sometable有列- ID (我们选择它为partitionColumn);我们在表中看到的列-ID的值范围是从1到1000,我们希望通过运行select * from sometable获得所有记录,所以我们使用了小写值=1&上界= 1000和num分区=4。
通过构建基于提要(lowerbound = 1 & upperbound = 1000 and numpartition = 4)的sql,这将生成一个由4个分区组成的数据格式,其中包含每个查询的结果。
select * from sometable where ID < 250
select * from sometable where ID >= 250 and ID < 500
select * from sometable where ID >= 500 and ID < 750
select * from sometable where ID >= 750如果我们表中的大多数记录都在ID(500,750)的范围内呢?这就是你所处的处境。
当我们增加数字分区时,分割会进一步发生,从而减少同一分区中的记录量,但这不是一个好机会。
与我们提供的基于边界的火花分裂partitioncolumn不同的是,如果您考虑自己来填充拆分,那么数据可以被均匀地分割。您需要切换到另一个JDBC方法,在该方法中,我们可以直接提供谓词而不是(lowerbound,upperbound & numpartition)。
def jdbc(url: String, table: String, predicates: Array[String], connectionProperties: Properties): DataFrame https://stackoverflow.com/questions/52603131
复制相似问题