首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >测试将值插入到mongodb中(pyspark,pymongo)

测试将值插入到mongodb中(pyspark,pymongo)
EN

Stack Overflow用户
提问于 2019-08-20 12:42:54
回答 2查看 736关注 0票数 1

我想(在本地)测试将一些值插入到mongo数据库中。如果我运行这个:

代码语言:javascript
复制
import pymongo
import mongomock

@mongomock.patch(
    servers=(("mongodb://null:null@localhost/test", 27017),), on_new="pymongo"
)
def get_mongodb_table():
    return pymongo.MongoClient('mongodb://null:null@localhost/test')['test']['table']

table = get_mongodb_table()
table.insert_one({'a': 'b'})  # This works!
table.find_one({})

{'a':'b','_id':ObjectId('5d5be9e853f24bf46d268d78')}

然而,下列情况却失败了:

代码语言:javascript
复制
import pyspark
import pymongo
import mongomock

SC = pyspark.SparkContext()

@mongomock.patch(
    servers=(("mongodb://null:null@localhost/test", 27017),), on_new="pymongo"
)
def get_mongodb_table():
    return pymongo.MongoClient('mongodb://null:null@localhost/test')['test']['table']

table = get_mongodb_table()

rdd = SC.parallelize([{'a': 0, 'b': 1}])
rdd.foreach(table.insert_one)  # This doesn't work!

PicklingError:无法序列化对象: TypeError:‘数据库’对象不可调用

如何修正测试以避免引发错误?如何测试从rdd插入数据集到mongo数据库?

EN

回答 2

Stack Overflow用户

发布于 2019-08-20 13:45:13

您正在尝试将您的Mongo连接引用到您以前在RDD之外建立的RDD中。Spark正在尝试序列化此连接,以便在每个RDD中处理它,但由于数据库对象的属性,它无法序列化。

如何解决:您需要在RDD处理中创建连接。

代码语言:javascript
复制
import pyspark
import pymongo
import mongomock

SC = pyspark.SparkContext()

@mongomock.patch(
    servers=(("mongodb://null:null@localhost/test", 27017),), on_new="pymongo"
)
def get_mongodb_table():
    return pymongo.MongoClient('mongodb://null:null@localhost/test')['test']['table']

def create_and_insert(x):
    table = get_mongodb_table()
    table.insert_one(x)

rdd = SC.parallelize([{'a': 0, 'b': 1}])
rdd.foreach(create_and_insert)

但是,I强烈建议在上传到数据库时使用foreachPartition而不是foreach。foreach为每个元素创建一个单独的连接。foreachPartition为每个元素分区创建一个单独的连接,当元素的数量大于这里的数量时,这将是一个小得多的连接。

票数 1
EN

Stack Overflow用户

发布于 2019-08-20 18:20:22

如果您对使用库没有意见,可以使用以下解决方案:

代码语言:javascript
复制
import pymongo_spark
pymongo_spark.activate()

# save rdd to the empty mongodb collection
rdd.saveToMongoDB('mongodb://host_ip:port/db.collection')

您还可以使用它将MongoDB集合读入RDD:

代码语言:javascript
复制
# create rdd for the mongodb collection
rdd = sc.mongoRDD('mongodb://host_ip:port/db.collection')
print(rdd.first())
print(rdd.count())
票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/57573964

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档