我是pySpark的新手。我正在尝试使用PySpark-dataframes获取hive表的最新分区(日期分区),如下所示。但我相信有一种更好的方法可以使用dataframe函数(而不是编写SQL)来实现这一点。你能分享一些更好的方法吗?
这个解决方案是通过扫描Hive表上的整个数据来获得它。
df_1 = sqlContext.table("dbname.tablename");
df_1_dates = df_1.select('partitioned_date_column').distinct().orderBy(df_1['partitioned_date_column'].desc())
lat_date_dict=df_1_dates.first().asDict()
lat_dt=lat_date_dict['partitioned_date_column']发布于 2019-08-10 16:59:04
我同意@philantrovert在评论中提到的。您可以使用下面的分区修剪方法进行筛选,以限制为您的配置单元表扫描的分区数量。
>>> spark.sql("""show partitions test_dev_db.newpartitiontable""").show();
+--------------------+
| partition|
+--------------------+
|tran_date=2009-01-01|
|tran_date=2009-02-01|
|tran_date=2009-03-01|
|tran_date=2009-04-01|
|tran_date=2009-05-01|
|tran_date=2009-06-01|
|tran_date=2009-07-01|
|tran_date=2009-08-01|
|tran_date=2009-09-01|
|tran_date=2009-10-01|
|tran_date=2009-11-01|
|tran_date=2009-12-01|
+--------------------+
>>> max_date=spark.sql("""show partitions test_dev_db.newpartitiontable""").rdd.flatMap(lambda x:x).map(lambda x : x.replace("tran_date=","")).max()
>>> print max_date
2009-12-01
>>> query = "select city,state,country from test_dev_db.newpartitiontable where tran_date ='{}'".format(max_date)
>>> spark.sql(query).show();
+--------------------+----------------+--------------+
| city| state| country|
+--------------------+----------------+--------------+
| Southampton| England|United Kingdom|
|W Lebanon ...| NH| United States|
| Comox|British Columbia| Canada|
| Gasperich| Luxembourg| Luxembourg|
+--------------------+----------------+--------------+
>>> spark.sql(query).explain(True)
== Parsed Logical Plan ==
'Project ['city, 'state, 'country]
+- 'Filter ('tran_date = 2009-12-01)
+- 'UnresolvedRelation `test_dev_db`.`newpartitiontable`
== Analyzed Logical Plan ==
city: string, state: string, country: string
Project [city#9, state#10, country#11]
+- Filter (tran_date#12 = 2009-12-01)
+- SubqueryAlias newpartitiontable
+- Relation[city#9,state#10,country#11,tran_date#12] orc
== Optimized Logical Plan ==
Project [city#9, state#10, country#11]
+- Filter (isnotnull(tran_date#12) && (tran_date#12 = 2009-12-01))
+- Relation[city#9,state#10,country#11,tran_date#12] orc
== Physical Plan ==
*(1) Project [city#9, state#10, country#11]
+- *(1) FileScan orc test_dev_db.newpartitiontable[city#9,state#10,country#11,tran_date#12] Batched: true, Format: ORC, Location: PrunedInMemoryFileIndex[hdfs://xxx.host.com:8020/user/xxx/dev/hadoop/database/test_dev..., PartitionCount: 1, PartitionFilters: [isnotnull(tran_date#12), (tran_date#12 = 2009-12-01)], PushedFilters: [], ReadSchema: struct<city:string,state:string,country:string>您可以在上面的计划中看到PartitionCount: 1它只扫描了12个可用分区中的一个分区。
发布于 2021-01-15 01:40:16
基于Vikrant的回答,这里有一种直接从表元数据中提取分区列值的更通用的方法,它避免了Spark扫描整个表中的所有文件。
首先,如果您的数据还没有在目录中注册,那么您需要这样做,以便Spark可以看到分区详细信息。在这里,我注册了一个名为data的新表。
spark.catalog.createTable(
'data',
path='/path/to/the/data',
source='parquet',
)
spark.catalog.recoverPartitions('data')
partitions = spark.sql('show partitions data')但是,为了显示一个自包含的答案,我将手动创建partitions DataFrame,以便您可以看到它的样子,以及从其中提取特定列值的解决方案。
from pyspark.sql.functions import (
col,
regexp_extract,
)
partitions = (
spark.createDataFrame(
[
('/country=usa/region=ri/',),
('/country=usa/region=ma/',),
('/country=russia/region=siberia/',),
],
schema=['partition'],
)
)
partition_name = 'country'
(
partitions
.select(
'partition',
regexp_extract(
col('partition'),
pattern=r'(\/|^){}=(\S+?)(\/|$)'.format(partition_name),
idx=2,
).alias(partition_name),
)
.show(truncate=False)
)此查询的输出为:
+-------------------------------+-------+
|partition |country|
+-------------------------------+-------+
|/country=usa/region=ri/ |usa |
|/country=usa/region=ma/ |usa |
|/country=russia/region=siberia/|russia |
+-------------------------------+-------+Scala中的解决方案看起来与此非常相似,只是对regexp_extract()的调用看起来略有不同:
.select(
regexp_extract(
col("partition"),
exp=s"(\\/|^)${partitionName}=(\\S+?)(\\/|$$)",
groupIdx=2
).alias(partitionName).as[String]
)同样,以这种方式查询分区值的好处是,Spark不会通过scan all the files in the table得到答案。如果您有一个包含数万或数十万个文件的表,那么您节省的时间将是巨大的。
https://stackoverflow.com/questions/55053218
复制相似问题