首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >foreach %dopar% + RPostgreSQL

foreach %dopar% + RPostgreSQL
EN

Stack Overflow用户
提问于 2010-10-11 08:19:31
回答 2查看 3.7K关注 0票数 11

我正在使用RPostgreSQL连接到本地数据库。这个设置在我的Linux机器上运行得很好。R 2.11.1,Postgres 8.4。

我使用多核(doMC)并行后端的'foreach‘来包装一些重复的查询(有几千个),并将结果附加到一个数据结构中。奇怪的是,如果我使用%do%,它可以工作,但当我切换到%dopar%时,它就失败了,当只有一次迭代时例外(如下所示)

我想知道它是否与单个connection对象有关,所以我创建了10个connection对象,并根据“i”是什么,为该查询提供了一个特定的con对象,这取决于i模10。(下面仅用2个connection对象表示)。表达式的计算结果为eval(expr.01),包含/是取决于'i‘是什么的查询。

我无法理解这些特定的错误消息。我想知道是否有任何方法可以使这项工作。

谢谢。

Vishal Belsare

R代码片段如下:

代码语言:javascript
复制
> id.qed2.foreach <- foreach(i = 1588:1588, .inorder=FALSE) %dopar% { 
+ if (i %% 2 == 0) {con <- con0}; 
+ if (i %% 2 == 1) {con <- con1}; 
+ fetch(dbSendQuery(con,eval(expr.01)),n=-1)$idreuters};
> id.qed2.foreach
[[1]]
  [1]   411   414  2140  2406  4490  4507  4519  4570  4571  4572  4703  4731
[109] 48765 84312 91797

> id.qed2.foreach <- foreach(i = 1588:1589, .inorder=FALSE) %dopar% { 
+ if (i %% 2 == 0) {con <- con0}; 
+ if (i %% 2 == 1) {con <- con1}; 
+ fetch(dbSendQuery(con,eval(expr.01)),n=-1)$idreuters};
Error in stop(paste("expired", class(con))) : 
  no function to return from, jumping to top level
Error in stop(paste("expired", class(con))) : 
  no function to return from, jumping to top level
Error in { : 
  task 1 failed - "error in evaluating the argument 'res' in selecting a method for function 'fetch'"
> 

编辑:我更改了一些东西,(仍然不成功),但有一些事情浮出水面。在循环中创建且未通过dbDisconnect“断开连接”的连接对象会导致连接挂起,这在Postgres的/var/log中很明显。当我执行此操作时,会显示一些新的错误消息:

代码语言:javascript
复制
> system.time(
+ id.qed2.foreach <- foreach(i = 1588:1590, .inorder=FALSE, 
.packages=c("DBI", "RPostgreSQL")) %dopar% {drv0 <- dbDriver("PostgreSQL"); 
con0 <- dbConnect(drv0, dbname='nseindia');
list(idreuters=fetch(dbSendQuery(con0,eval(expr.01)),n=-1)$idreuters);
dbDisconnect(con0)})
Error in postgresqlExecStatement(conn, statement, ...) : 
  no function to return from, jumping to top level
Error in postgresqlExecStatement(conn, statement, ...) : 
  no function to return from, jumping to top level
Error in postgresqlExecStatement(conn, statement, ...) : 
  no function to return from, jumping to top level
Error in { : 
  task 1 failed - "error in evaluating the argument 'res' in selecting a method for function 'fetch'"
EN

回答 2

Stack Overflow用户

回答已采纳

发布于 2010-10-11 23:26:45

下面的工作和速度比顺序形式提高了大约1.5倍。下一步,我想知道是否可以将一个连接对象附加到由registerDoMC产生的每个工作进程。如果是这样的话,就不需要创建/销毁连接对象,这可以防止连接淹没PostgreSQL服务器。

代码语言:javascript
复制
pgparquery <- function(i) {
drv <- dbDriver("PostgreSQL"); 
con <- dbConnect(drv, dbname='nsdq'); 
lst <- eval(expr.01); #contains the SQL query which depends on 'i'
qry <- dbSendQuery(con,lst);
tmp <- fetch(qry,n=-1);
dt <- dates.qed2[i]
dbDisconnect(con);
result <- list(date=dt, idreuters=tmp$idreuters)
return(result)}

id.qed.foreach <- foreach(i = 1588:3638, .inorder=FALSE, .packages=c("DBI", "RPostgreSQL")) %dopar% {pgparquery(i)}

--

Vishal Belsare

票数 2
EN

Stack Overflow用户

发布于 2014-07-08 22:14:50

为每个工作进程创建一次数据库连接比为每个任务创建一次数据库连接更有效。不幸的是,mclapply没有提供在执行任务之前初始化工作进程的机制,因此使用doMC后端来实现这一点并不容易,但是如果你使用doParallel后端,你可以使用clusterEvalQ来初始化工作进程。下面是一个如何重构代码的示例:

代码语言:javascript
复制
library(doParallel)
cl <- makePSOCKcluster(detectCores())
registerDoParallel(cl)

clusterEvalQ(cl, {
  library(DBI)
  library(RPostgreSQL)
  drv <- dbDriver("PostgreSQL")
  con <- dbConnect(drv, dbname="nsdq")
  NULL
})

id.qed.foreach <- foreach(i=1588:3638, .inorder=FALSE,
                          .noexport="con",
                          .packages=c("DBI", "RPostgreSQL")) %dopar% {
  lst <- eval(expr.01)  #contains the SQL query which depends on 'i'
  qry <- dbSendQuery(con, lst)
  tmp <- fetch(qry, n=-1)
  dt <- dates.qed2[i]
  list(date=dt, idreuters=tmp$idreuters)
}

clusterEvalQ(cl, {
  dbDisconnect(con)
})

由于doParallel和clusterEvalQ使用相同的集群对象cl,因此在执行任务时,doParallel循环将能够访问数据库连接对象con

票数 20
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/3902796

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档