我正在从mongodb集合中提取数据,并使用Spark python代码将其写入bigquery表。
下面是我的代码片段:
df = spark.read\
.format("com.mongodb.spark.sql.DefaultSource")\
.option("uri","mongodb_url")\
.option("database","db_name")\
.option("collection", "collection_name")\
.load()
df.write \
.format("bigquery") \
.mode("append")\
.option("temporaryGcsBucket","gcs_bucket") \
.option("createDisposition","CREATE_IF_NEEDED")\
.save("bq_dataset_name.collection_name")这将从mongodb集合中提取所有数据。但我只想提取满足条件的文档(如sql查询中的where条件)。
我发现的一种方法是读取dataframe中的整个数据,并对该dataframe使用过滤器,如下所示:
df2 = df.filter(df['date'] < '12-03-2020 10:12:40')但是由于我的源mongo集合有8-10 Gb的数据,我不能每次都从mongo读取整个数据。
如何在使用spark.read从mongo读取数据时使用过滤?
发布于 2021-05-17 16:02:57
您是否尝试过检查是否在应用过滤器后仍在扫描您的整个数据?
假设您使用的是带有spark的official connector,则支持过滤器/谓词下推。
“谓词下推”是一种从连接器和催化剂优化器到数据节点自动“下推”谓词的优化。目标是在将数据加载到Spark的节点内存之前,最大限度地提高数据存储端过滤出的数据量。
连接器自动将两种谓词下推到MongoDB:
将子句内容(projections)作为一个或多个投影内容( $project
select filter content,where)作为一个或多个投影您可以找到此over here的支持代码。
注意:嵌套字段上的谓词下推存在一些问题,但这是spark本身的一个错误,也会影响其他来源。该问题已在Spark 3.x中修复。检查this answer。
https://stackoverflow.com/questions/66260362
复制相似问题