我想使用akka Streams的ActorRefSource构建一个项目序列。所述源被连续地馈送数据。计算完成后,Stream将以毒丸终止。
下面这个简单的例子说明了我的意图:
val source = Source.actorRef[Int](1000, OverflowStrategy.fail)
.mapMaterializedValue{ ref =>
for(i <- 1 to 1000) {
ref ! i
}
ref ! PoisonPill
}
source.runWith(Sink.seq).foreach(s => println("count: "+s.size))我期望Stream处理所有1000个元素,然后由于接收到毒丸而终止。不幸的是,Stream通常会提前终止。示例输出为:
count: 24在发送毒丸之前等待一段时间,例如,1000毫秒会导致所有号码都被处理。
任何关于如何确保所有物品在收到毒丸之前都已处理的想法都将不胜感激。
发布于 2016-08-31 18:30:33
参见the documentation for Source.actorRef:PoisonPill在终止流之前不会刷新缓冲区。
https://stackoverflow.com/questions/39242286
复制相似问题