首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >如何确定所有演员都收到了广播信息?

如何确定所有演员都收到了广播信息?
EN

Stack Overflow用户
提问于 2014-09-28 23:56:54
回答 1查看 175关注 0票数 2

我有一个ActorA,它从输入流中读取并发送消息给一组ActorB。当ActorA到达输入流的末尾时,它会清理它的资源,向ActorB广播一条已完成的消息,并关闭它自己。

我有大约12个ActorB,它们向ActorC的一组发送消息。当ActorB收到来自ActorA的to消息时,它会清理它的资源并关闭自己,除了最后一个幸存的ActorB,它在关闭自己之前向ActorC广播一个to消息。

我有大约24个ActorC向单个ActorD发送消息。与ActorB类似,当每个ActorC获得一条to消息时,它会清理其资源并关闭自己,但最后一个幸存的ActorC除外,后者向ActorD发送一条to消息。

当ActorD收到一条完成消息时,它会清理它的资源并关闭它自己。

最初,我让ActorB和ActorC在他们收到消息时立即传播to消息,但这可能会导致ActorC在所有ActorB完成其队列处理之前关闭;同样,ActorC的ActorD可能在ActorC完成队列处理之前关闭。

我的解决方案是使用ActorB之间共享的AtomicInteger

代码语言:javascript
复制
class ActorB(private val actorCRouter: ActorRef,
             private val actorCount: AtomicInteger) extends Actor {
  private val init = {
    actorCount.incrementAndGet()
    ()
  }

  def receive = {
    case Done => {
      if(actorCount.decrementAndGet() == 0) {
        actorCRouter ! Broadcast(Done)
      }
      // clean up resources
      context.stop(self)
    }
  }
}

ActorC使用类似的代码,每个ActorC共享一个AtomicInteger。

目前,所有参与者都是在web服务方法中初始化的,下游ActorRef在上游参与者的构造器中传递。

是否有更好的方法来做到这一点,例如使用对Akka方法的调用而不是使用AtomicInteger?

编辑:我正在考虑以下可能的替代方案:当参与者收到一条完成消息时,它将接收超时设置为5秒(程序将花费一个多小时的时间运行,因此将清理/关机延迟几秒钟不会影响性能);当演员获得ReceiveTimeout时,它会向下游的参与者广播、清理和关闭。( ActorB和ActorC的路由器使用SmallestMailboxRouter)

代码语言:javascript
复制
class ActorB(private val actorCRouter: ActorRef) extends Actor {

  def receive = {
    case Done => {
      context.setReceiveTimeout(Duration.create(5, SECONDS))
    }

    case ReceiveTimeout => {
      actorCRouter ! Broadcast(Done)
      // clean up resources
      context.stop(self)
    }
  }
}
EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2014-10-05 20:04:18

在相关行为者之间共享actorCount并不是件好事。参与者应该只使用自己的状态来处理消息。让ActorBCompletionHanlder演员做ActorB类型的演员怎么样?所有ActorB都将引用ActorBCompletionHanlder参与者。每次ActorB收到已完成的消息时,它都可以进行必要的清理,只需将完成的消息传递给ActorBCompletionHanlder。ActorBCompletionHanlder将维护状态变量以保持计数。每次收到已完成的消息时,它都可以简单地更新计数器。因为这是这个参与者的唯一状态变量,所以不需要它是原子的,这样就不需要任何显式锁定。一旦收到最后完成的消息,ActorBCompletionHanlder将向ActorC发送已完成的消息。这样,activeCount的共享不是在参与者之间,而是由ActorBCompletionHanlder管理的。对于其他类型,也可以重复相同的事情。

A-> B's -> BCompletionHanlder -> C's -> CCompletionHandler -> D

其他办法可以是为每一组相关行为者设立一个监测行为体。使用监视器上的watch api和终止子事件,您可以选择在收到最后完成的消息后决定要做什么。

代码语言:javascript
复制
val child = context.actorOf(Props[ChildActor])
    context.watch(child)

    case Terminated(child) => {
        log.info(child + " Child actor terminated")
    } 
票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/26090815

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档