我使用Snappydata和pyspark一起运行sql查询,并将输出DF转换为字典,将其大容量插入mongo中。我曾经历过许多类似的查询,以测试星星之火DF到Dictionary的转换。
目前,我使用map(lambda row: row.asDict(), x.collect())这个方法将我的大容量 DF转换为字典。10K记录需要2-3秒。
我已经在下面说明了我的想法的含义:
x = snappySession.sql("select * from test")
df = map(lambda row: row.asDict(), x.collect())
db.collection.insert_many(df)有更快的路吗?
发布于 2017-12-07 15:08:08
我建议使用foreachPartition
(snappySession
.sql("select * from test")
.foreachPartition(insert_to_mongo))其中insert_to_mongo
def insert_to_mongo(rows):
client = ...
db = ...
db.collection.insert_many((row.asDict() for row in rows))发布于 2017-12-07 11:04:16
我会调查你是否可以直接写信给蒙戈从星火,因为这将是最好的方法。
否则,您可以使用以下方法:
x = snappySession.sql("select * from test")
dictionary_rdd = x.rdd.map(lambda row: row.asDict())
for d in dictionary_rdd.toLocalIterator():
db.collection.insert_many(d)这将以分布式的方式创建星火中的所有字典。这些行将返回到驱动程序中,并一次插入到Mongo中,这样您就不会耗尽内存。
https://stackoverflow.com/questions/47693295
复制相似问题