首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >如何检查在akka应用程序中配置了什么调度程序

如何检查在akka应用程序中配置了什么调度程序
EN

Stack Overflow用户
提问于 2017-02-22 16:05:30
回答 2查看 1K关注 0票数 1

我在conf文件中有下面的条目。但我不确定这个dispatcher设置是否被选中,并且使用的最终并行值是什么?

代码语言:javascript
复制
        akka{
      actor{
        default-dispatcher {
              type = Dispatcher
              executor = "fork-join-executor"
              throughput = 3
              fork-join-executor {
                parallelism-min = 40
                parallelism-factor = 10
                parallelism-max = 100
              }
            }
       }
    }

我有8台核心机器,所以我希望 80 并行线程处于就绪状态,40分钟<80 (8*10因子)< 100max。我想看看akka为max并行线程使用了什么值。我创建了45个子角色,在我的日志中,我打印线程id应用程序-akka.actor.Default-dispatcher-xx,并且我看不到超过20个线程并行运行。

EN

回答 2

Stack Overflow用户

发布于 2017-02-22 20:19:30

假设您有一个ActorSystem实例,您可以检查它的配置中设置的值。这是如何获得在配置文件中设置的值的方法:

代码语言:javascript
复制
val system = ActorSystem()
val config = system.settings.config.getConfig("akka.actor.default-dispatcher")
config.getString("type")
config.getString("executor")
config.getString("throughput")
config.getInt("fork-join-executor.parallelism-min")
config.getInt("fork-join-executor.parallelism-max")
config.getDouble("fork-join-executor.parallelism-factor")

我希望这能帮到你。有关特定配置设置的更多详细信息,还可以参考页面。

更新

我已经在Akka挖掘了更多,以了解它对您的设置的确切用途。正如您可能已经预料到的,它使用了一个ForkJoinPool。用于构建它的并行性是由以下方面提供的:

代码语言:javascript
复制
object ThreadPoolConfig {
  ...
  def scaledPoolSize(floor: Int, multiplier: Double, ceiling: Int): Int =
math.min(math.max((Runtime.getRuntime.availableProcessors * multiplier).ceil.toInt, floor), ceiling)
  ...
}

这个函数在某个时候被用来构建一个ForkJoinExecutorServiceFactory

代码语言:javascript
复制
new ForkJoinExecutorServiceFactory(
  validate(tf),
  ThreadPoolConfig.scaledPoolSize(
    config.getInt("parallelism-min"),
    config.getDouble("parallelism-factor"),
    config.getInt("parallelism-max")),
  asyncMode)

无论如何,这是用于创建ForkJoinPool的并行性,它实际上是java.lang.ForkJoinPool的一个实例。现在我们必须问这个池使用了多少个线程?简单的回答是,只有在需要时,它才会使用整个容量(在本例中为80个线程)。

为了演示这个场景,我在参与者内部运行了几个使用Thread.sleep的测试。我发现,它可以从大约10个线程(如果没有进行睡眠调用)使用到最大80个线程(如果我调用睡眠1秒)。试验是在一台有8个核心的机器上进行的。

总之,您需要检查Akka所使用的实现,以确切地了解如何使用并行性,这就是我研究ForkJoinPool的原因。除了查看配置文件,然后检查特定的实现之外,我认为不幸的是,您不能这样做:

我希望这能澄清答案--一开始我以为您想看看参与者系统的调度器是如何配置的。

票数 0
EN

Stack Overflow用户

发布于 2017-02-22 21:30:45

为了最大限度地提高并行性,所有的参与者都需要同时处理一些消息。您确定在您的应用程序中是这样的吗?

以下面的代码为例

代码语言:javascript
复制
object Test extends App {

  val system = ActorSystem()

  (1 to 80).foreach{ _ =>
    val ref = system.actorOf(Props[Sleeper])
    ref ! "hello"
  }
}

class Sleeper extends Actor {
  override def receive: Receive = {
    case msg =>
      //Thread.sleep(60000)
      println(msg)
  }
}

如果您考虑您的配置和8个内核,您将看到少量线程正在生成(4,5?)由于消息的处理速度太快,以至于无法建立真正的并行性。

相反,如果您保持您的参与者CPU-不注释讨厌的Thread.sleep,您将看到线程的数量将高达80个。然而,这将只持续1分钟,之后线程将逐渐从池中退出。

我想主要的诀窍是:不要把每个演员都放在一个单独的线程上。当一个或多个消息出现在一个参与者的邮箱上时,分配器就会醒来,并且--实际上--将消息处理任务分派到指定的池中。

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/42396558

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档