首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >有没有办法在每个其他流之后执行任何特定的流,而无需显式地插入它

有没有办法在每个其他流之后执行任何特定的流,而无需显式地插入它
EN

Stack Overflow用户
提问于 2020-08-05 17:12:00
回答 2查看 49关注 0票数 0

我有多个流(处理从queue收到的消息)要执行,每个流之后,我需要检查前一个流中是否有错误,如果有,则过滤掉正在处理的消息,否则继续下一个流。

目前,我必须在每个其他流之后显式地插入这个错误处理程序流。有没有什么方法可以用一些功能来完成这一点,其中这个错误流可以配置为在每个其他流之后运行。有没有其他更好的方法来做到这一点?

示例:

流程1 ->验证报文,如果报错,将报文标记为报错

error flow ->检查消息是否标记为error,如果标记为error,则过滤,否则继续。

流2 ->将消息持久化到db,在出错的情况下标记。

error flow ->检查消息是否标记为错误,如果标记为错误,则进行筛选,否则继续

flow 3、->等等。

或者有没有办法包装(流1+错误流),(流2 ->错误流)?

EN

回答 2

Stack Overflow用户

发布于 2020-08-05 19:55:44

我不确定这是否完全符合您的要求,但我有一种解决方案。可以做的是创建所有流,例如,我们可以查看:

代码语言:javascript
复制
val flows = Seq (
  Flow.fromFunction[Int, Int](x => { println(s"flow1: Received $x"); x * 2 }),
  Flow.fromFunction[Int, Int](x => { println(s"flow2: Received $x"); x + 1}),
  Flow.fromFunction[Int, Int](x => { println(s"flow3: Received $x"); x * x})
)

然后,我们需要将错误处理附加到每个现有的流中。因此,让我们定义它,并将其添加到每个元素:

代码语言:javascript
复制
val errorHandling = Flow[Int].filter(_ % 2 == 0)
val errorsHandledFlows = flows.map(flow => flow.via(errorHandling))

现在,我们需要一个helper函数,它将连接我们所有的新流:

代码语言:javascript
复制
def connectFlows(errorsHandledFlows: Seq[Flow[Int, Int, _]]): Flow[Int, Int, _] = {
  errorsHandledFlows match {
    case Seq() => Flow[Int] // This line is actually redundant, But I don't want to have an unexhausted pattern matching
    case Seq(singleFlow) => singleFlow
    case head :: tail => head.via(connectFlows(tail))
  }
}

现在,我们需要一起执行,例如:

代码语言:javascript
复制
Source(1 to 4).via(connectFlows(errorsHandledFlows)).to(Sink.foreach(println)).run()

将提供以下输出:

代码语言:javascript
复制
flow1: Received 1
flow2: Received 2
flow1: Received 2
flow2: Received 4
flow1: Received 3
flow2: Received 6
flow1: Received 4
flow2: Received 8

如你所知,我们过滤了奇数。因此,第一个流获取从1到4的所有数字。第二个流接收2,4,6,8 (第一个流将值乘以2),最后一个流没有接收任何流,因为第二个流将所有值都设为奇数。

票数 0
EN

Stack Overflow用户

发布于 2020-08-06 00:50:59

您还可以使用Merge

代码语言:javascript
复制
val g = RunnableGraph.fromGraph(GraphDSL.create() { implicit builder =>
  import GraphDSL.Implicits._

  val merge = builder.add(Merge[Int](3))

  val flow1 = ...
  val flow2 = ...
  val flow3 = ...

  flow1 ~> merge
  flow2 ~> merge
  flow3 ~> merge

  ClosedShape
})

不确定它是否满足您的需求,只是显示替代方案。

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/63261929

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档