我有多个流(处理从queue收到的消息)要执行,每个流之后,我需要检查前一个流中是否有错误,如果有,则过滤掉正在处理的消息,否则继续下一个流。
目前,我必须在每个其他流之后显式地插入这个错误处理程序流。有没有什么方法可以用一些功能来完成这一点,其中这个错误流可以配置为在每个其他流之后运行。有没有其他更好的方法来做到这一点?
示例:
流程1 ->验证报文,如果报错,将报文标记为报错
error flow ->检查消息是否标记为error,如果标记为error,则过滤,否则继续。
流2 ->将消息持久化到db,在出错的情况下标记。
error flow ->检查消息是否标记为错误,如果标记为错误,则进行筛选,否则继续
flow 3、->等等。
或者有没有办法包装(流1+错误流),(流2 ->错误流)?
发布于 2020-08-05 19:55:44
我不确定这是否完全符合您的要求,但我有一种解决方案。可以做的是创建所有流,例如,我们可以查看:
val flows = Seq (
Flow.fromFunction[Int, Int](x => { println(s"flow1: Received $x"); x * 2 }),
Flow.fromFunction[Int, Int](x => { println(s"flow2: Received $x"); x + 1}),
Flow.fromFunction[Int, Int](x => { println(s"flow3: Received $x"); x * x})
)然后,我们需要将错误处理附加到每个现有的流中。因此,让我们定义它,并将其添加到每个元素:
val errorHandling = Flow[Int].filter(_ % 2 == 0)
val errorsHandledFlows = flows.map(flow => flow.via(errorHandling))现在,我们需要一个helper函数,它将连接我们所有的新流:
def connectFlows(errorsHandledFlows: Seq[Flow[Int, Int, _]]): Flow[Int, Int, _] = {
errorsHandledFlows match {
case Seq() => Flow[Int] // This line is actually redundant, But I don't want to have an unexhausted pattern matching
case Seq(singleFlow) => singleFlow
case head :: tail => head.via(connectFlows(tail))
}
}现在,我们需要一起执行,例如:
Source(1 to 4).via(connectFlows(errorsHandledFlows)).to(Sink.foreach(println)).run()将提供以下输出:
flow1: Received 1
flow2: Received 2
flow1: Received 2
flow2: Received 4
flow1: Received 3
flow2: Received 6
flow1: Received 4
flow2: Received 8如你所知,我们过滤了奇数。因此,第一个流获取从1到4的所有数字。第二个流接收2,4,6,8 (第一个流将值乘以2),最后一个流没有接收任何流,因为第二个流将所有值都设为奇数。
发布于 2020-08-06 00:50:59
您还可以使用Merge
val g = RunnableGraph.fromGraph(GraphDSL.create() { implicit builder =>
import GraphDSL.Implicits._
val merge = builder.add(Merge[Int](3))
val flow1 = ...
val flow2 = ...
val flow3 = ...
flow1 ~> merge
flow2 ~> merge
flow3 ~> merge
ClosedShape
})不确定它是否满足您的需求,只是显示替代方案。
https://stackoverflow.com/questions/63261929
复制相似问题