我想(在本地)测试将一些值插入到mongo数据库中。如果我运行这个:
import pymongo
import mongomock
@mongomock.patch(
servers=(("mongodb://null:null@localhost/test", 27017),), on_new="pymongo"
)
def get_mongodb_table():
return pymongo.MongoClient('mongodb://null:null@localhost/test')['test']['table']
table = get_mongodb_table()
table.insert_one({'a': 'b'}) # This works!
table.find_one({}){'a':'b','_id':ObjectId('5d5be9e853f24bf46d268d78')}
然而,下列情况却失败了:
import pyspark
import pymongo
import mongomock
SC = pyspark.SparkContext()
@mongomock.patch(
servers=(("mongodb://null:null@localhost/test", 27017),), on_new="pymongo"
)
def get_mongodb_table():
return pymongo.MongoClient('mongodb://null:null@localhost/test')['test']['table']
table = get_mongodb_table()
rdd = SC.parallelize([{'a': 0, 'b': 1}])
rdd.foreach(table.insert_one) # This doesn't work!PicklingError:无法序列化对象: TypeError:‘数据库’对象不可调用
如何修正测试以避免引发错误?如何测试从rdd插入数据集到mongo数据库?
发布于 2019-08-20 13:45:13
您正在尝试将您的Mongo连接引用到您以前在RDD之外建立的RDD中。Spark正在尝试序列化此连接,以便在每个RDD中处理它,但由于数据库对象的属性,它无法序列化。
如何解决:您需要在RDD处理中创建连接。
import pyspark
import pymongo
import mongomock
SC = pyspark.SparkContext()
@mongomock.patch(
servers=(("mongodb://null:null@localhost/test", 27017),), on_new="pymongo"
)
def get_mongodb_table():
return pymongo.MongoClient('mongodb://null:null@localhost/test')['test']['table']
def create_and_insert(x):
table = get_mongodb_table()
table.insert_one(x)
rdd = SC.parallelize([{'a': 0, 'b': 1}])
rdd.foreach(create_and_insert)但是,I强烈建议在上传到数据库时使用foreachPartition而不是foreach。foreach为每个元素创建一个单独的连接。foreachPartition为每个元素分区创建一个单独的连接,当元素的数量大于这里的数量时,这将是一个小得多的连接。
发布于 2019-08-20 18:20:22
如果您对使用库没有意见,可以使用以下解决方案:
import pymongo_spark
pymongo_spark.activate()
# save rdd to the empty mongodb collection
rdd.saveToMongoDB('mongodb://host_ip:port/db.collection')您还可以使用它将MongoDB集合读入RDD:
# create rdd for the mongodb collection
rdd = sc.mongoRDD('mongodb://host_ip:port/db.collection')
print(rdd.first())
print(rdd.count())https://stackoverflow.com/questions/57573964
复制相似问题