我在conf文件中有下面的条目。但我不确定这个dispatcher设置是否被选中,并且使用的最终并行值是什么?
akka{
actor{
default-dispatcher {
type = Dispatcher
executor = "fork-join-executor"
throughput = 3
fork-join-executor {
parallelism-min = 40
parallelism-factor = 10
parallelism-max = 100
}
}
}
}我有8台核心机器,所以我希望 80 并行线程处于就绪状态,40分钟<80 (8*10因子)< 100max。我想看看akka为max并行线程使用了什么值。我创建了45个子角色,在我的日志中,我打印线程id应用程序-akka.actor.Default-dispatcher-xx,并且我看不到超过20个线程并行运行。
发布于 2017-02-22 20:19:30
假设您有一个ActorSystem实例,您可以检查它的配置中设置的值。这是如何获得在配置文件中设置的值的方法:
val system = ActorSystem()
val config = system.settings.config.getConfig("akka.actor.default-dispatcher")
config.getString("type")
config.getString("executor")
config.getString("throughput")
config.getInt("fork-join-executor.parallelism-min")
config.getInt("fork-join-executor.parallelism-max")
config.getDouble("fork-join-executor.parallelism-factor")我希望这能帮到你。有关特定配置设置的更多详细信息,还可以参考这页面。
更新
我已经在Akka挖掘了更多,以了解它对您的设置的确切用途。正如您可能已经预料到的,它使用了一个ForkJoinPool。用于构建它的并行性是由以下方面提供的:
object ThreadPoolConfig {
...
def scaledPoolSize(floor: Int, multiplier: Double, ceiling: Int): Int =
math.min(math.max((Runtime.getRuntime.availableProcessors * multiplier).ceil.toInt, floor), ceiling)
...
}这个函数在某个时候被用来构建一个ForkJoinExecutorServiceFactory
new ForkJoinExecutorServiceFactory(
validate(tf),
ThreadPoolConfig.scaledPoolSize(
config.getInt("parallelism-min"),
config.getDouble("parallelism-factor"),
config.getInt("parallelism-max")),
asyncMode)无论如何,这是用于创建ForkJoinPool的并行性,它实际上是java.lang.ForkJoinPool的一个实例。现在我们必须问这个池使用了多少个线程?简单的回答是,只有在需要时,它才会使用整个容量(在本例中为80个线程)。
为了演示这个场景,我在参与者内部运行了几个使用Thread.sleep的测试。我发现,它可以从大约10个线程(如果没有进行睡眠调用)使用到最大80个线程(如果我调用睡眠1秒)。试验是在一台有8个核心的机器上进行的。
总之,您需要检查Akka所使用的实现,以确切地了解如何使用并行性,这就是我研究ForkJoinPool的原因。除了查看配置文件,然后检查特定的实现之外,我认为不幸的是,您不能这样做:
我希望这能澄清答案--一开始我以为您想看看参与者系统的调度器是如何配置的。
发布于 2017-02-22 21:30:45
为了最大限度地提高并行性,所有的参与者都需要同时处理一些消息。您确定在您的应用程序中是这样的吗?
以下面的代码为例
object Test extends App {
val system = ActorSystem()
(1 to 80).foreach{ _ =>
val ref = system.actorOf(Props[Sleeper])
ref ! "hello"
}
}
class Sleeper extends Actor {
override def receive: Receive = {
case msg =>
//Thread.sleep(60000)
println(msg)
}
}如果您考虑您的配置和8个内核,您将看到少量线程正在生成(4,5?)由于消息的处理速度太快,以至于无法建立真正的并行性。
相反,如果您保持您的参与者CPU-不注释讨厌的Thread.sleep,您将看到线程的数量将高达80个。然而,这将只持续1分钟,之后线程将逐渐从池中退出。
我想主要的诀窍是:不要把每个演员都放在一个单独的线程上。当一个或多个消息出现在一个参与者的邮箱上时,分配器就会醒来,并且--实际上--将消息处理任务分派到指定的池中。
https://stackoverflow.com/questions/42396558
复制相似问题