我有一个pyspark Dataframe,现在我想遍历每一行并插入/更新到mongoDB集合。
#Did every required imports
#dataframe
+---+----+
|age|name|
+---+----+
| 30| c|
| 5| e|
| 6| f|
+---+----+
db = mongodbclient['mydatabase']
collection = db['mycollection']
#created below function to insert/update
def customFunction(row):
key = {'name':row.name}
data = dict(zip(columns,[row.x for x in columns]))
collection.update(key, data, {upsert:true})
#return a_flag #commented it as of now, a_flag can be 0 or 1如果名称存在于mongoDB集合'mycollection‘中,它应该更新行/记录,否则插入新记录。
当我试图在spark-dataframe上映射这个函数时,我得到了以下错误
result = my_dataframe.rdd.map(customFunction)
#.....TypeError: can't pickle _thread.lock objects....
#AttributeError: 'TypeError' object has no attribute 'message'有没有人能找出‘这个函数和/或其他地方哪里出了问题’,或者请建议是否有任何其他类型的任务可以替代。
基本上迭代每一行(没有collect调用,这是可能的吗?)
并且,在每一行上应用一个函数来运行外部spark工作。
请提前建议,谢谢..:)
我在mongoDB中的数据
name age
a 1
b 2
c 3 #new update should make age as 30 and 2 more new recs should inserted发布于 2017-09-08 00:48:18
看起来连接对象不能进行pickled。我会使用foreachPartition
def customFunction(rows):
db = mongodbclient['mydatabase']
collection = db['mycollection']
for row in rows:
key = {'name':row.name}
data = dict(zip(columns,[row.x for x in columns]))
collection.update(key, data, {upsert:true})
my_dataframe.rdd.foreachPartition(customFunction)但请记住,致命故障可能会使数据库处于不一致的状态。
发布于 2017-09-08 02:16:52
如果要在MongoDB中插入500k条记录,则批量模式可能是更有效的处理方式。与在spark中实际执行请求相比,在mongoDB中执行请求需要更多的电力(仅仅是创建请求),甚至并行执行都可能导致mongo端的不稳定(并且比“迭代”方法慢)。
您可以尝试以下代码。它不使用collect(),所以它在驱动程序上是内存高效的:
bulk = collection.initialize_unordered_bulk_op()
for row in rdd.toLocalIterator():
key = {'name':row.name}
data = dict(zip(columns,[row.x for x in columns]))
bulk.update(key, data, {upsert:true})
print(bulk.execute())https://stackoverflow.com/questions/46091855
复制相似问题