我正在更新一些实验喷雾代码到akka流(2.0.2),所以我正在慢慢地阅读他的文档。我的需求之一是,如果我在流中检测到协议违规,我需要立即关闭流并启动客户端。
从流内部立即关闭(终止)流的正确方法是什么?
发布于 2016-01-22 20:21:58
使用PushStage
import akka.stream.stage._
val closeStage = new PushStage[Tpe, Tpe] {
override def onPush(elem: Tpe, ctx: Context[Tpe]) = elem match {
case elem if shouldCloseStream ⇒
// println("stream closed")
ctx.finish()
case elem ⇒
ctx.push(elem)
}
}您可以通过PushStage方法将Flow与transform组合起来:
Flow[Tpe]
.map(...)
.transform(() ⇒ closeStage)
.map(...)https://stackoverflow.com/questions/34954144
复制相似问题