首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >提高scala .par操作的并行性水平

提高scala .par操作的并行性水平
EN

Stack Overflow用户
提问于 2019-06-11 10:09:16
回答 1查看 664关注 0票数 2

当我在集合上调用par时,它似乎创建了大约5-10个线程,这对于绑定CPU的任务来说是很好的。

但有时我有一些任务是IO绑定的,在这种情况下,我希望有500-1000个线程同时从IO中提取--执行10-15个线程非常慢,而且我看到我的CPU大多处于空闲状态。

我怎样才能做到这一点?

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2019-06-11 11:32:47

您可以将阻塞io操作包装在blocking块中:

代码语言:javascript
复制
(0 to 1000).par.map{ i =>
    blocking {
      Thread.sleep(100)
      Thread.activeCount()
    }
}.max // yield 67 on my pc, while without blocking it's 10

但是您应该问自己一个问题,如果您应该使用并行集合来执行IO操作。他们的用例是执行CPU繁重的任务。

我建议你考虑利用期货进行IO呼叫。

您还应该考虑为该任务使用自定义的执行上下文,因为全局执行上下文是一个公共的单例,并且您无法控制代码使用的内容和用途。如果使用外部库中的所有线程,则可以很容易地饿死由外部库创建的并行计算。

代码语言:javascript
复制
// or just use scala.concurrent.ExecutionContext.Implicits.global if you don't care
implicit val blockingIoEc: ExecutionContextExecutor = ExecutionContext.fromExecutor(
    Executors.newCachedThreadPool()
) 

def fetchData(index: Int): Future[Int] =  Future {
   //if you use global ec, then it's required to mark computation as blocking to increase threads,
   //if you use custom cached thread pool it should increase thread number even without it
    blocking { 
      Thread.sleep(100)
      Thread.activeCount()
    }
}

val futures = (0 to 1000).map(fetchData)

Future.sequence(futures).onComplete {
    case Success(data) => println(data.max) //prints about 1000 on my pc
}

Thread.sleep(1000)

编辑

还可以使用ForkJoinTaskSupport使用自定义ForkJoinPool

代码语言:javascript
复制
import java.util.concurrent.ForkJoinPool //scala.concurrent.forkjoin.ForkJoinPool is deprecated
import scala.util.Random
import scala.collection.parallel

val fjpool = new ForkJoinPool(2) 
val customTaskSupport = new parallel.ForkJoinTaskSupport(fjpool) 

val numbers = List(1,2,3,4,5).par 

numbers.tasksupport = customTaskSupport //assign customTaskSupport
票数 3
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/56541362

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档