首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >Akka Stream throttle and ask模式

Akka Stream throttle and ask模式
EN

Stack Overflow用户
提问于 2017-07-31 23:12:25
回答 1查看 915关注 0票数 0

我之前使用的是来自akka.contrib的TimerBasedThrottler,但现在已被弃用,取而代之的是akka stream throttle。这是我的演员

代码语言:javascript
复制
class MyActor extends Actor{

    def receive = {
        case Message(msg) =>
        val date = LocalDateTime.now()
        println(s"getting: $msg @ $date from ${sender()}")
        sender() ! s"$msg ack"
    }
}

这是我的油门

代码语言:javascript
复制
val throttler = Source.actorRef(bufferSize = 1000, OverflowStrategy.dropNew)
                      .throttle(1, 1.second, 1, ThrottleMode.shaping)
                      .to(Sink.actorRef(myActor, NotUsed))
                      .run()

我这样使用它:

代码语言:javascript
复制
val res = (throttler ? MyActor.Message("hello")).mapTo[String]

但是我得到了一个错误:

代码语言:javascript
复制
akka.pattern.AskTimeoutException: Ask timed out on [Actor[akka://QuickStart/user/StreamSupervisor-0/flow-0-2 actorRefSource#355339300]] after [2000 ms]. Sender[null] sent message of type "MyActor$Message".
EN

回答 1

Stack Overflow用户

发布于 2017-08-01 00:24:45

这是行不通的,因为throttlerMyActor是不同的角色,因此它不会向发送者返回任何内容。虽然您自己的myActor确实会回复发送者,但在内部,传入的消息是由akka-streams内部发送的,它会忽略任何回复。毕竟,流中的消息总是在一个方向上流动(另一个方向是为需求保留的)。

您可以做的是通过消息将引用传递给原始参与者:

代码语言:javascript
复制
class Sender(myActor: ActorRef)
            (implicit materializer: Materializer) extends Actor {
  val throttler = Source.actorRef[(MyActor.Message, ActorRef)](bufferSize = 1000, OverflowStrategy.dropNew)
    .throttle(1, 1.second, 1, ThrottleMode.shaping)
    .to(Sink.actorRef(myActor, NotUsed))
    .run()

  override def receive: Receive = {
    case DoIt =>
      throttler ! (MyActor.Message("hello"), self)
    case response: String =>
      println(s"received response: $response")
  }
}

class MyActor extends Actor {
  override def receive: Receive = {
    case (MyActor.Message(msg), originalSender: ActorRef) =>
      val date = LocalDateTime.now()
      println(s"getting: $msg @ $date from $originalSender")
      originalSender ! s"$msg ack"
  }
}

object MyActor {
  case class Message(msg: String)
}

当然,为了扩展,将原始发送者合并到消息类中可能更好,但基本方法保持不变。

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/45419519

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档