首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >遍历pyspark Dataframe,然后针对每一行与mongoDB交互

遍历pyspark Dataframe,然后针对每一行与mongoDB交互
EN

Stack Overflow用户
提问于 2017-09-07 16:46:08
回答 2查看 6.8K关注 0票数 2

我有一个pyspark Dataframe,现在我想遍历每一行并插入/更新到mongoDB集合。

代码语言:javascript
复制
 #Did every required imports 
 #dataframe
 +---+----+
 |age|name|
 +---+----+
 | 30|   c|
 |  5|   e|
 |  6|   f|
 +---+----+
  db = mongodbclient['mydatabase']
  collection = db['mycollection']
  #created below function to insert/update
  def customFunction(row):
      key = {'name':row.name}
      data = dict(zip(columns,[row.x for x in columns]))
      collection.update(key, data, {upsert:true})
      #return a_flag #commented it as of now, a_flag can be 0 or 1

如果名称存在于mongoDB集合'mycollection‘中,它应该更新行/记录,否则插入新记录。

当我试图在spark-dataframe上映射这个函数时,我得到了以下错误

代码语言:javascript
复制
 result = my_dataframe.rdd.map(customFunction)
 #.....TypeError: can't pickle _thread.lock objects....
 #AttributeError: 'TypeError' object has no attribute 'message'

有没有人能找出‘这个函数和/或其他地方哪里出了问题’,或者请建议是否有任何其他类型的任务可以替代。

基本上迭代每一行(没有collect调用,这是可能的吗?)

并且,在每一行上应用一个函数来运行外部spark工作。

请提前建议,谢谢..:)

我在mongoDB中的数据

代码语言:javascript
复制
name  age
 a    1
 b    2
 c    3 #new update should make age as 30 and 2 more new recs should inserted
EN

回答 2

Stack Overflow用户

发布于 2017-09-08 00:48:18

看起来连接对象不能进行pickled。我会使用foreachPartition

代码语言:javascript
复制
def customFunction(rows):
    db = mongodbclient['mydatabase']
    collection = db['mycollection']

    for row in rows:
        key = {'name':row.name}
        data = dict(zip(columns,[row.x for x in columns]))
        collection.update(key, data, {upsert:true})

my_dataframe.rdd.foreachPartition(customFunction)

但请记住,致命故障可能会使数据库处于不一致的状态。

票数 1
EN

Stack Overflow用户

发布于 2017-09-08 02:16:52

如果要在MongoDB中插入500k条记录,则批量模式可能是更有效的处理方式。与在spark中实际执行请求相比,在mongoDB中执行请求需要更多的电力(仅仅是创建请求),甚至并行执行都可能导致mongo端的不稳定(并且比“迭代”方法慢)。

您可以尝试以下代码。它不使用collect(),所以它在驱动程序上是内存高效的:

代码语言:javascript
复制
bulk = collection.initialize_unordered_bulk_op()
for row in rdd.toLocalIterator():
    key = {'name':row.name}
    data = dict(zip(columns,[row.x for x in columns]))
    bulk.update(key, data, {upsert:true})

print(bulk.execute())
票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/46091855

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档