我正在尝试编写一个具有多线程功能的CRAN包。我用doSNOW实现了一个完美的解决方案,但是这个包已经被CRAN团队标记为“替代”了,他们让我切换到doParallel解决方案。这很好,但是我无法找到一种方法来监视使用doParallel完成的作业数量,就像使用doSNOW时一样。以下是我的doSNOW解决方案:
# Set up parameters
nthreads<-2
nreps<-100
funrep<-function(i){
Sys.sleep(0.1)
res<-c(log2(i),log10(i))
return(res)
}
# doSNOW solution
library(doSNOW)
cl<-makeCluster(nthreads)
registerDoSNOW(cl)
pb<-txtProgressBar(0,nreps,style=3)
progress<-function(n){
setTxtProgressBar(pb,n)
}
opts<-list(progress=progress)
i<-0
output<-foreach(i=icount(nreps),.combine=c,.options.snow=opts) %dopar% {
s<-funrep(i)
return(s)
}
close(pb)
stopCluster(cl)这是一个doParallel解决方案,如建议的在以前的堆栈溢出帖子中。但是,正如您所看到的,它不会在完成任务时打印进度,它只是在结果组合之后,在最后才打印出来。
# doParallel solution
library(doParallel)
progcombine<-function(){
count<-0
function(...) {
count<<-count+length(list(...))
setTxtProgressBar(pb,count)
utils::flush.console()
c(...)
}
}
cl <- makeCluster(nthreads)
registerDoParallel(cl)
output<-foreach(i = icount(nreps),.combine=progcombine()) %dopar% {
funrep(i)
}
stopCluster(cl)您能建议我使用doParallel或至少不使用取代的doSNOW来监视作业状态完成情况的解决方案吗?可能有一个进度条,也可能有多操作系统的能力。非常感谢!
发布于 2019-10-20 14:09:10
我无法用doParallel找到解决方案(我认为它不支持完成作业的进度条),但也许您可以尝试新的包pbapply
# pbapply solution
library(pbapply)
cl<-parallel::makeCluster(nthreads)
invisible(parallel::clusterExport(cl=cl,varlist=c("nreps")))
invisible(parallel::clusterEvalQ(cl=cl,library(utils)))
result<-pblapply(cl=cl,
X=1:nreps,
FUN=funrep)
parallel::stopCluster(cl)发布于 2019-10-20 21:19:07
(否认:我是进步软件包和未来框架的作者)
当使用进度r作为前程的并行后端时,doFuture包可以实现这一点。
library(progressr) ## use progressr for procession updates
library(doFuture) ## attaches also foreach and future
registerDoFuture() ## tell foreach to use futures
plan(multisession) ## parallelize over a local PSOCK cluster
xs <- 1:5
with_progress({
p <- progressor(along = xs) ## create a 5-step progressor
y <- foreach(x = xs) %dopar% {
p() ## signal a progression update
Sys.sleep(6.0-x)
sqrt(x)
}
})默认情况是使用utils::txtProgressBar()进行进度报告,但可以更改。例如,以下内容将通过progress::progress_bar()和beepr::beep()报告进度更新
progressr::handlers("progress", "beepr")您还可以为每次进度更新添加消息。
p(sprintf("x=%g", x))plan(multisession, workers = 2)是plan(cluster, workers = cl)的缩写,cl基本上是cl <- parallel::makeCluster(2L)。
PS。progressr包的目标是为进度更新提供一个最小的、可持续的、可扩展的和统一的API。这一点,但与迭代器框架的使用无关。
PPS。progressr正在开发中;它可能需要一段时间才能确定自己的真实身份。
发布于 2019-10-20 14:24:46
使用surveillance::plapply的解决方案。进度条的处理在要并行化的函数中实现。
funrep2 <- function(i){
i <<- i + 1
setTxtProgressBar(pb, i)
res <- c(log2(i), log10(i))
Sys.sleep(0.1)
return(res)
}在将对象导出到集群之后,处理方式与parallel解决方案类似:
library(parallel)
pb <- txtProgressBar(max=nreps, style=3)
cl <- makeCluster(nthreads)
clusterExport(cl, c("pb", "funrep2"), envir=environment())
clusterEvalQ(cl, library(surveillance))
i <- 0
output <- surveillance::plapply(1:nreps, "funrep2")
stopCluster(cl)
close(pb)
# |=============================================================================... | 85%https://stackoverflow.com/questions/58473626
复制相似问题