出于某种原因,我不得不同时使用gRPC和Akka。当这个演员作为一个顶级演员开始的时候,没有什么问题(在这个小演示中)。但是,当它成为一个子角色时,它将无法接收任何消息,并且会记录以下内容:
[default-akka.actor.default-dispatcher-6] [akka://default/user/Grpc] Message [AkkaMessage.package$GlobalStart] from Actor[akka://default/user/TrackerCore#-808631363] to Actor[akka://default/user/Grpc#-1834173068] was not delivered. [1] dead letters encountered.示例核心:
class GrpcActor() extends Actor {
val ec = scala.concurrent.ExecutionContext.global
val service = grpcService.bindService(new GrpcServerImpl(), ec)
override def receive: Receive = {
case GlobalStart() => {
println("GlobalStart")
}
...
}
}我尝试创建一个新的ExecutionContext,比如:
scala.concurrent.ExecutionContext.fromExecutor(Executors.newFixedThreadPool(10))为什么会发生这种情况,以及如何调试这样的死字母问题(无异常抛出)?
更新:
抱歉,我没有把所有的东西都列出来。我用普通的主要方法测试GrpcActor作为顶级演员,ScalaTest测试它作为子演员,这是一个错误。
class GrpcActorTest extends FlatSpec with Matchers{
implicit val system = ActorSystem()
val actor: ActorRef = system.actorOf(Props[GrpcActor])
actor ! GlobalStart()
}正是这个空测试套件主动关闭了整个参与者系统。但问题是这条线
val service = grpcService.bindService(new GrpcServerImpl(), ec)GlobalStart()的交付在关闭后被推迟。
如果没有该行,则可以在关机之前传递消息。
这是正常的行为吗?
(我猜:GlobalStart()是在关机消息之后排队的,这做了一些繁重的工作,造成了时间上的差异)
发布于 2017-10-15 21:50:44
解决这个问题的一种方法是使service成为一个lazy val
class GrpcActor extends Actor {
...
lazy val service = grpcService.bindService(new GrpcServerImpl(), ec)
...
}lazy val对于长期运行的操作非常有用:在本例中,它将service的初始化推迟到首次使用为止。如果没有lazy修饰符,则在创建参与者时初始化service。
另一种方法是在测试中添加一个Thread.sleep,以防止参与者系统在参与者完全初始化之前关闭:
class GrpcActorTest extends FlatSpec with Matchers {
...
actor ! GlobalStart()
Thread.sleep(5000) // or whatever length of time is needed to initialize the actor
}(顺便提一句,考虑在演员测试中使用Akka Testkit。)
发布于 2017-10-15 08:23:56
将监督策略添加到其父级,将println添加到参与者生命周期中。有些东西会害死你的演员。最后,如果您提供了一个完整的示例,也许我可以说更多:)
https://stackoverflow.com/questions/46752000
复制相似问题