首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >使用Akka-IO TCP初始化akka参与者

使用Akka-IO TCP初始化akka参与者
EN

Stack Overflow用户
提问于 2014-08-06 15:28:42
回答 2查看 797关注 0票数 3

使用Akka-IO TCP,在参与者中建立连接的过程如下:

代码语言:javascript
复制
class MyActor(remote: InetSocketAddress) extends Actor {

  IO(Tcp) ! Connect(remote)    //this is the first step, remote is the address to connect to

  def receive = {
    case CommandFailed(_: Connect) => context stop self // failed to connect

    case Connected(remote, local) =>
      val connection = sender()
      connection ! Register(self)
      // do cool things...  
  }
}

您向Connect发送一条IO(Tcp)消息,并期望接收一条CommandFailedConnected消息。

现在,我的目标是创建一个封装TCP连接的参与者,但我希望我的参与者只在连接建立后才开始接受消息--否则,在等待Connected消息时,它将开始接受查询,但没有人可以发送它们。

我试过的是:

代码语言:javascript
复制
class MyActor(address: InetSocketAddress) extends Actor {

  def receive = {
    case Initialize =>
      IO(Tcp) ! Connect(address)
      context.become(waitForConnection(sender()))

    case other => sender ! Status.Failure(new Exception(s"Connection to $address not established yet."))
  }

  private def waitForConnection(initializer: ActorRef): Receive = {
    case Connected(_, _) =>
      val connection = sender()
      connection ! Register(self)
      initializer ! Status.Success(Unit)
      // do cool things

    case CommandFailed(_: Connect) =>
      initializer ! Status.Failure(new Exception("Failed to connect to " + host))
      context stop self
  }
}

我的第一个receive是期待一条虚构的Initialize消息,它将触发整个连接过程,一旦完成,Initialize的发送方就会收到一条成功的消息,并知道它可以知道开始发送查询。

我对此不太满意,这迫使我用

代码语言:javascript
复制
val actor = system.actorOf(MyActor.props(remote))
Await.ready(actor ? Initialize, timeout)

而且它不会是非常“重启”友好的。

可以保证在Tcp层用Connected应答之前,我的参与者不会开始从邮箱接收消息。

EN

回答 2

Stack Overflow用户

回答已采纳

发布于 2014-08-06 16:31:14

使用藏物特性存储您现在无法处理的消息。当每个不成熟的消息到达时,请使用stash()来推迟它。一旦连接打开,使用unstashAll()将这些消息返回到邮箱进行处理。然后可以使用become()切换到消息处理状态。

票数 5
EN

Stack Overflow用户

发布于 2014-08-07 07:12:06

为了使您的参与者更易于重新启动,您可以覆盖与参与者生命周期相关的方法,如preStartpostStopAkka文件对演员的开始、停止和重新启动挂钩都有很好的解释。

代码语言:javascript
复制
class MyActor(remote: InetSocketAddress) extends Actor {

  override def preStart() {
    IO(Tcp) ! Connect(remote) 
  }

  ...
}

现在你可以用val actor = system.actorOf(MyActor.props(remote))开始你的演员了。它在启动时建立连接,在重新启动时重新建立连接。

票数 2
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/25164389

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档