我有一个ActorA,它从输入流中读取并发送消息给一组ActorB。当ActorA到达输入流的末尾时,它会清理它的资源,向ActorB广播一条已完成的消息,并关闭它自己。
我有大约12个ActorB,它们向ActorC的一组发送消息。当ActorB收到来自ActorA的to消息时,它会清理它的资源并关闭自己,除了最后一个幸存的ActorB,它在关闭自己之前向ActorC广播一个to消息。
我有大约24个ActorC向单个ActorD发送消息。与ActorB类似,当每个ActorC获得一条to消息时,它会清理其资源并关闭自己,但最后一个幸存的ActorC除外,后者向ActorD发送一条to消息。
当ActorD收到一条完成消息时,它会清理它的资源并关闭它自己。
最初,我让ActorB和ActorC在他们收到消息时立即传播to消息,但这可能会导致ActorC在所有ActorB完成其队列处理之前关闭;同样,ActorC的ActorD可能在ActorC完成队列处理之前关闭。
我的解决方案是使用ActorB之间共享的AtomicInteger
class ActorB(private val actorCRouter: ActorRef,
private val actorCount: AtomicInteger) extends Actor {
private val init = {
actorCount.incrementAndGet()
()
}
def receive = {
case Done => {
if(actorCount.decrementAndGet() == 0) {
actorCRouter ! Broadcast(Done)
}
// clean up resources
context.stop(self)
}
}
}ActorC使用类似的代码,每个ActorC共享一个AtomicInteger。
目前,所有参与者都是在web服务方法中初始化的,下游ActorRef在上游参与者的构造器中传递。
是否有更好的方法来做到这一点,例如使用对Akka方法的调用而不是使用AtomicInteger?
编辑:我正在考虑以下可能的替代方案:当参与者收到一条完成消息时,它将接收超时设置为5秒(程序将花费一个多小时的时间运行,因此将清理/关机延迟几秒钟不会影响性能);当演员获得ReceiveTimeout时,它会向下游的参与者广播、清理和关闭。( ActorB和ActorC的路由器使用SmallestMailboxRouter)
class ActorB(private val actorCRouter: ActorRef) extends Actor {
def receive = {
case Done => {
context.setReceiveTimeout(Duration.create(5, SECONDS))
}
case ReceiveTimeout => {
actorCRouter ! Broadcast(Done)
// clean up resources
context.stop(self)
}
}
}发布于 2014-10-05 20:04:18
在相关行为者之间共享actorCount并不是件好事。参与者应该只使用自己的状态来处理消息。让ActorBCompletionHanlder演员做ActorB类型的演员怎么样?所有ActorB都将引用ActorBCompletionHanlder参与者。每次ActorB收到已完成的消息时,它都可以进行必要的清理,只需将完成的消息传递给ActorBCompletionHanlder。ActorBCompletionHanlder将维护状态变量以保持计数。每次收到已完成的消息时,它都可以简单地更新计数器。因为这是这个参与者的唯一状态变量,所以不需要它是原子的,这样就不需要任何显式锁定。一旦收到最后完成的消息,ActorBCompletionHanlder将向ActorC发送已完成的消息。这样,activeCount的共享不是在参与者之间,而是由ActorBCompletionHanlder管理的。对于其他类型,也可以重复相同的事情。
A-> B's -> BCompletionHanlder -> C's -> CCompletionHandler -> D
其他办法可以是为每一组相关行为者设立一个监测行为体。使用监视器上的watch api和终止子事件,您可以选择在收到最后完成的消息后决定要做什么。
val child = context.actorOf(Props[ChildActor])
context.watch(child)
case Terminated(child) => {
log.info(child + " Child actor terminated")
} https://stackoverflow.com/questions/26090815
复制相似问题