首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >如果抛出异常,Akka Actor不会终止

如果抛出异常,Akka Actor不会终止
EN

Stack Overflow用户
提问于 2011-05-30 04:25:46
回答 3查看 8.7K关注 0票数 73

我目前正在尝试开始使用Akka,但我面临着一个奇怪的问题。我为我的Actor编写了以下代码:

代码语言:javascript
复制
class AkkaWorkerFT extends Actor {
  def receive = {
    case Work(n, c) if n < 0 => throw new Exception("Negative number")
    case Work(n, c) => self reply n.isProbablePrime(c);
  }
}

这就是我如何开始我的员工:

代码语言:javascript
复制
val workers = Vector.fill(nrOfWorkers)(actorOf[AkkaWorkerFT].start());
val router = Routing.loadBalancerActor(SmallestMailboxFirstIterator(workers)).start()

这就是我关闭一切的方法:

代码语言:javascript
复制
futures.foreach( _.await )
router ! Broadcast(PoisonPill)
router ! PoisonPill

现在发生的情况是,如果我发送n>0的workers消息(没有抛出异常),一切都会正常工作,应用程序也会正常关闭。但是,只要我向它发送一条导致异常的消息,应用程序就不会终止,因为仍然有一个参与者在运行,但我不知道它来自哪里。

如果有帮助,下面是有问题的线程的堆栈:

代码语言:javascript
复制
  Thread [akka:event-driven:dispatcher:event:handler-6] (Suspended) 
    Unsafe.park(boolean, long) line: not available [native method]  
    LockSupport.park(Object) line: 158  
    AbstractQueuedSynchronizer$ConditionObject.await() line: 1987   
    LinkedBlockingQueue<E>.take() line: 399 
    ThreadPoolExecutor.getTask() line: 947  
    ThreadPoolExecutor$Worker.run() line: 907   
    MonitorableThread(Thread).run() line: 680   
    MonitorableThread.run() line: 182   

PS:没有终止的线程不是任何工作线程,因为我添加了一个postStop回调,每个线程都会正常停止。

Actors.registry.shutdownAll解决了这个问题,但我认为shutdownAll只应该作为最后的手段使用,不是吗?

EN

回答 3

Stack Overflow用户

回答已采纳

发布于 2011-11-12 02:34:30

在akka参与者中处理问题的正确方法不是抛出异常,而是设置主管层次结构

“在并发代码中抛出异常(让我们假设我们使用的是非链接的参与者),只会简单地炸毁当前执行该参与者的线程。

没有办法发现错误(除了检查堆栈跟踪)。你对此无能为力。“

请参阅Fault Tolerance Through Supervisor Hierarchies (1.2)

*注意*对于Akka的旧版本(1.2)来说,上述情况是正确的。在新版本(例如2.2)中,您仍然可以设置一个supervisor层次结构,但它会捕获子进程抛出的异常。例如:

代码语言:javascript
复制
class Child extends Actor {
    var state = 0
    def receive = {
      case ex: Exception ⇒ throw ex
      case x: Int        ⇒ state = x
      case "get"         ⇒ sender ! state
    }
  }

在supervisor中:

代码语言:javascript
复制
class Supervisor extends Actor {
    import akka.actor.OneForOneStrategy
    import akka.actor.SupervisorStrategy._
    import scala.concurrent.duration._

    override val supervisorStrategy =
      OneForOneStrategy(maxNrOfRetries = 10, withinTimeRange = 1 minute) {
        case _: ArithmeticException      ⇒ Resume
        case _: NullPointerException     ⇒ Restart
        case _: IllegalArgumentException ⇒ Stop
        case _: Exception                ⇒ Escalate
      }

    def receive = {
      case p: Props ⇒ sender ! context.actorOf(p)
    }
  }

请参阅Fault Tolerance Through Supervisor Hierarchies (2.2)

票数 23
EN

Stack Overflow用户

发布于 2012-01-06 03:36:25

按照Viktor的建议,关闭日志记录以确保事件终止,这有点奇怪。相反,您可以做的是:

代码语言:javascript
复制
EventHandler.shutdown()

这干净利落地关闭了所有(记录器)侦听器,这些侦听器在异常之后保持运行:

代码语言:javascript
复制
def shutdown() {
  foreachListener(_.stop())
  EventHandlerDispatcher.shutdown()
}
票数 2
EN

Stack Overflow用户

发布于 2011-11-09 17:17:12

akka.conf中打开记录器

票数 -3
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/6170227

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档