情况
我正在使用akka演员更新我的网络客户端的数据。这些参与者中的一个完全可以发送有关单个Agent的更新。这些代理更新非常快(每10 is更新一次)。我现在的目标是控制这种更新机制,以便每一个Agent的最新版本每300 My发送一次。
我的代码
到目前为止,这就是我想出来的:
/**
* Single agents are updated very rapidly. To limit the burden on the web-frontend, we throttle the messages here.
*/
class BroadcastSingleAgentActor extends Actor {
private implicit val ec: ExecutionContextExecutor = context.dispatcher
private var queue = Set[Agent]()
context.system.scheduler.schedule(0 seconds, 300 milliseconds) {
queue.foreach { a =>
broadcastAgent(self)(a) // sends the message to all connected clients
}
queue = Set()
}
override def receive: Receive = {
// this message is received every 10 ms for every agent present
case BroadcastAgent(agent) =>
// only keep the newest version of the agent
queue = queue.filter(_.id != agent.id) + agent
}
}问题
这个参与者(BroadcastSingleAgentActor)按预期工作,但我不能100%确定这是否是线程安全(更新queue,同时潜在地清除它)。而且,这并不意味着我在充分利用akka提供给我的工具。我找到了这个文章 (在Akka 2中节流消息),但我的问题是,我需要保留最新的Agent消息,同时删除它的任何旧版本。有没有类似我需要的地方的例子?
发布于 2019-03-26 16:36:27
不,这并不是线程安全,因为通过ActorSystem的调度将发生在另一个线程上,而不是receive。一个潜在的想法是在receive方法中执行调度,因为传入到BroadcastSingleAgentActor的消息将按顺序处理。
override def receive: Receive = {
case Refresh =>
context.system.scheduler.schedule(0 seconds, 300 milliseconds) {
queue.foreach { a =>
broadcastAgent(self)(a) // sends the message to all connected clients
}
}
queue = Set()
// this message is received every 10 ms for every agent present
case BroadcastAgent(agent) =>
// only keep the newest version of the agent
queue = queue.filter(_.id != agent.id) + agent
}https://stackoverflow.com/questions/55361451
复制相似问题