要在spark中加载和分区传入的数据,我使用以下语法。
val dataframe = spark.read.format("jdbc")
.option("url", url)
.option("driver", driver)
.option("user", user)
.option("password", password)
.option("dbtable", query)
.option("partitionColumn", partitionColumn)
.option("lowerBound", lowerBound_value)
.option("upperBound", upperBound_value)
.option("numPartitions", numPartitions)
.option("fetchsize", 15000)
.load()参数partitionColumn、lowerBound、upperBound、numPartitions用于优化作业的性能。
我有一个包含1000条记录的表&一个序列号从1到1000的整数列。我首先在该列上运行min和max,将min值分配给lowerBound,将max值分配给upperBound。numPartitions参数被指定为3,这样传入的数据被均匀地分割成3个不同的分区(或者接近于偶数)。
当数据较少时,上面的设计效果很好。但我有一个场景,如下所示。
我有一个包含2030亿条记录的表,其中没有包含唯一/序列整数的整数列。然后是一个日期列,其数据分布在5年内,即2016-2021年。为了更快地移动数据,我每次都移动每年一个月的数据。这是我正在使用的查询:
val query = s"(select * from table where date_column >= '${YearMonth.of(year.toInt, month).atDay(1).toString} and date_time <= '${YearMonth.of(year.toInt, month).atEndOfMonth().toString} 23:59:59.999') as datadf"因此,上面的查询变成:select * from table where date_column >= '2016-01-01' and date_time <= '2016-01-31 23:59:59.999'',依此类推,每一年的每个月的第一天和最后一天。
这是对我的循环的粗略描述:
(2016 to 2021) { year =>
(1 to 12) { month =>
val query = s"(select * from table where date_column >= '${YearMonth.of(year.toInt, month).atDay(1).toString} and date_time <= '${YearMonth.of(year.toInt, month).atEndOfMonth().toString} 23:59:59.999') as datadf"
val dataframe = spark.read.format("jdbc")
.option("url", url)
.option("driver", driver)
.option("user", user)
.option("password", password)
.option("dbtable", query)
.option("partitionColumn", partitionColumn)
.option("lowerBound", lowerBound_value)
.option("upperBound", upperBound_value)
.option("numPartitions", numPartitions)
.option("fetchsize", 15000)
.load()
}
}为了找出界限,我使用了相同的月份和年份过滤器,如下所示:
val bounds = spark.read.format("jdbc")
.option("url", url)
.option("driver", driver)
.option("user", user)
.option("password", password)
.option("dbtable", "(select min(partitionColumn) as mn, max(partitionColum) as from tablename where date_column >= '${YearMonth.of(year.toInt, month).atDay(1).toString} and date_time <= '${YearMonth.of(year.toInt, month).atEndOfMonth().toString} 23:59:59.999') as boundsDF")
.load()
val lowerBound_value = bounds.select("mn").head.getInt(0)
val upperBound_value = bounds.select("mx").head.getInt(0)这里的问题是找到过滤数据的下界和上界。由于数据量巨大,使用给定过滤器在partitionColumn上运行最小和最大值的查询花费的时间比将实际数据帧写入hdfs要多得多。
我尝试在那里提供随机值,但在任务运行时观察到分区中的数据倾斜。
是否必须给出partitionColumn的最小和最大值作为更好的数据分布的下限和上界?如果不是,有没有办法指定下限和上限,而不是对数据运行min & max查询?
任何帮助都是非常感谢的。
发布于 2021-06-14 14:22:20
对于200+十亿行,我真的希望您的表在DB中与您访问数据的日期列相同的日期列上进行分区。如果没有这一点,查询将是非常无望的。
但是您有没有尝试过日期/时间戳值的上下限的整数等效值?查看this reference了解Spark将整数值转换为时间戳。
将JDBC选项lowerBound和upperBound转换为TimestampType值,方法与将字符串转换为TimestampType值/DateType值的方式相同。该转换基于公历和由SQL config spark.sql.session.timeZone定义的时区。在Spark版本2.4及更低版本中,转换基于混合日历(儒略历+公历)和默认系统时区。
正如您所提到的,这里没有可以使用的预先存在的整型列。因此,在您的循环中,上下限是静态的,因此可以转换为静态的上下限数值。根据Spark的内部结构,将下限和上限值划分为数值范围和multiple queries are thrown to DB,以获取每个查询的单个分区的数据。这也意味着,在相关列上进行表分区或在源数据库中具有适当的索引对于性能而言非常重要。
您需要确保在所提供的查询中适当地放置上下限的占位符。作为提示,实际数值可能会因所使用的数据库系统而异。如果出现这种情况,即数据库系统到日期的整数转换不同,那么您将需要提供数据库接受的值,而不是Spark。来自same docs
参数: connectionFactory -返回开放连接的工厂。RDD负责关闭连接。sql -查询的文本。查询必须包含两个?用于对结果进行分区的参数占位符。例如,
选择书名,作者从哪里?<= id和id <=?lowerBound -第一个占位符的最小值upperBound -第二个占位符的最大值下限和上限都包含在内。
..。
同样,很明显,<=和>=也被利用了,因此上下限都包含在内;这是我在其他问题上观察到的一个混淆之处。
https://stackoverflow.com/questions/67918638
复制相似问题