我正在尝试读取拼图文件并将其转储到mongodb集合(切分)中。当我不进行切分时,写吞吐量确实很好。但在切分之后,它已经大幅度下降。
一项任务需要超过30分钟,只需处理16 mb数据。

我正在使用下面的
(
SparkConf()
.setMaster("yarn")
.set("spark.executor.memory", "30g")
.set("spark.executor.instances", "10")
.set("spark.executor.cores", "5")
.set("spark.sql.shuffle.partitions", "2000")
.set("spark.network.timeout", "800")
.set("spark.sql.broadcastTimeout", "1200")
.set("spark.default.parallelism", "2000")
.set('spark.jars', './mongo*.jar')
.set("spark.mongodb.input.uri", mongo_uri)
.set("spark.mongodb.input.database", db)
.set("spark.mongodb.input.collection", db_collection)
.set("spark.mongodb.output.uri", mongo_uri)
.set("spark.mongodb.output.database", db)
.set("spark.mongodb.output.collection", db_collection)
.set("spark.mongodb.input.partitionerOptions.partitionKey", shard_key)
.set("spark.mongodb.input.partitioner", "MongoShardedPartitioner")
.set("spark.mongodb.input.partitionerOptions.shardkey", shard_key)
)我想抛出200亿多个记录,8个小时后,它只插入了大约8亿份文档。
文档大小相同,每个文档有250 KB。
没有使用其他索引。
发布于 2022-02-17 06:57:44
以防别人卷入这件事。由于某些原因,在写入mongo时不使用火花会话级mongo配置。
要克服这一问题,在编写步骤中显式地给出mongo配置。
mongo_conf = {
'uri': mongo_uri,
'database': db,
'maxBatchSize': 100000,
'forceInsert': True,
'ordered': False,
'shardKey': {'your_key': 'hashed'},
'collection': "test2",
}
(
df
.write
.options(**mongo_conf)
.format(mongo_format)
.mode("append").save()
)https://stackoverflow.com/questions/70335751
复制相似问题