我之前使用的是来自akka.contrib的TimerBasedThrottler,但现在已被弃用,取而代之的是akka stream throttle。这是我的演员
class MyActor extends Actor{
def receive = {
case Message(msg) =>
val date = LocalDateTime.now()
println(s"getting: $msg @ $date from ${sender()}")
sender() ! s"$msg ack"
}
}这是我的油门
val throttler = Source.actorRef(bufferSize = 1000, OverflowStrategy.dropNew)
.throttle(1, 1.second, 1, ThrottleMode.shaping)
.to(Sink.actorRef(myActor, NotUsed))
.run()我这样使用它:
val res = (throttler ? MyActor.Message("hello")).mapTo[String]但是我得到了一个错误:
akka.pattern.AskTimeoutException: Ask timed out on [Actor[akka://QuickStart/user/StreamSupervisor-0/flow-0-2 actorRefSource#355339300]] after [2000 ms]. Sender[null] sent message of type "MyActor$Message".发布于 2017-08-01 00:24:45
这是行不通的,因为throttler与MyActor是不同的角色,因此它不会向发送者返回任何内容。虽然您自己的myActor确实会回复发送者,但在内部,传入的消息是由akka-streams内部发送的,它会忽略任何回复。毕竟,流中的消息总是在一个方向上流动(另一个方向是为需求保留的)。
您可以做的是通过消息将引用传递给原始参与者:
class Sender(myActor: ActorRef)
(implicit materializer: Materializer) extends Actor {
val throttler = Source.actorRef[(MyActor.Message, ActorRef)](bufferSize = 1000, OverflowStrategy.dropNew)
.throttle(1, 1.second, 1, ThrottleMode.shaping)
.to(Sink.actorRef(myActor, NotUsed))
.run()
override def receive: Receive = {
case DoIt =>
throttler ! (MyActor.Message("hello"), self)
case response: String =>
println(s"received response: $response")
}
}
class MyActor extends Actor {
override def receive: Receive = {
case (MyActor.Message(msg), originalSender: ActorRef) =>
val date = LocalDateTime.now()
println(s"getting: $msg @ $date from $originalSender")
originalSender ! s"$msg ack"
}
}
object MyActor {
case class Message(msg: String)
}当然,为了扩展,将原始发送者合并到消息类中可能更好,但基本方法保持不变。
https://stackoverflow.com/questions/45419519
复制相似问题