假设我有一个接收消息的工作者,做一些处理并返回一个结果。我有一系列的信息需要转换成一系列的结果:
object Test {
case class Message(str: String)
case class Result(str: String)
class Worker extends Actor {
def receive = {
case Message(data) =>
println("Sleeping: " + data)
Thread.sleep(10000)
val result = Result(data + " - result")
println("Sending result: " + result)
sender ! result
}
}
def test(messages: Seq[Message]): Future[Seq[Result]] = {
val worker = ActorSystem().actorOf(Props(new Worker))
val results = messages.map { m =>
implicit val timeout = Timeout(20 seconds)
println("Sending: " + m)
val result = worker ? m
result.asInstanceOf[Future[Result]]
}
Future.sequence(results)
}
def main(args: Array[String]): Unit = {
val messages: Seq[Message] = args.map(Message(_))
test(messages).foreach { r =>
println("Result: " + r)
}
}}
如果我只以"message-1“作为参数运行上面的内容,那么它运行的很好,输出如下:
发送:消息(消息-1)睡眠:消息-1发送结果:结果(消息-1-结果)结果:ArraySeq(结果(消息-1-结果))
但是,如果我这样做的话:" message -1“"message-2”"message-3“然后最后一条消息被发送到deadLetters:
发送:消息(消息-1)发送:消息(消息-2)睡眠:消息-1发送:消息(消息-3) 发送结果:结果(消息-1-结果) 睡眠:信息-2 发送结果:结果(消息-2-结果) 睡眠:信息-3 发送结果:结果(消息-3-结果) 信息默认-akka.actor.default-Dispatcher-2消息util.Tester$Result从Actorakka://default/user/$a#1776546850到Actorakka://default/死信没有传递。遇到1封死信。这个日志可以关闭或调整配置设置‘akka.log-死-字母’和‘akka.log-死信-在关闭。
我猜这是因为在发送最后一条消息时,我的调用线程已经超出了范围。如何将所有结果正确地收集到一个序列中?
请注意,将我的测试方法更改为下面将得到相同的结果:
def test(messages: Seq[Message]): Future[Seq[Result]] = {
val worker = ActorSystem().actorOf(Props(new Worker))
Future.traverse(messages) { m =>
implicit val timeout = Timeout(20 seconds)
println("Sending: " + m)
val result = worker ? m
result.asInstanceOf[Future[Result]]
}
}发布于 2016-07-15 08:02:57
似乎是因为我的超时时间太低了。应该足够大,足以涵盖所有的工作-例如40秒。
发布于 2016-07-14 19:52:55
愚蠢的回答是:
Future.traverse(messages)(m => actor ? m).map(_.asInstanceOf[Result])但最好是同时发送数据:
class Worker extends Actor {
def receive = {
case Message(data) =>
// Convert data into result
...
sender ! result
case seq: Seq[Message] =>
...
sender ! results
}
}https://stackoverflow.com/questions/38382562
复制相似问题