首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >Akka流通过流限制并行/处理流的吞吐量

Akka流通过流限制并行/处理流的吞吐量
EN

Stack Overflow用户
提问于 2019-03-18 11:06:00
回答 1查看 539关注 0票数 6

我有一个用例,我想向外部系统发送一条消息,但是发送此消息的流接受并返回一种我不能在下游使用的类型。这是通过流的一个很好的用例。我正在使用实现这里。最初,我担心如果processingFlow使用mapAsyncUnordered,那么这个流就无法工作。因为处理流可能会重新排序消息,而zip可能会用不正确的对推出一个元组。例如,在下面的示例中。

代码语言:javascript
复制
  val testSource = Source(1 until 50)
  val processingFlow: Flow[Int, Int, NotUsed] = Flow[Int].mapAsyncUnordered(10)(x => Future {
    Thread.sleep(Random.nextInt(50))
    x * 10
  })
  val passThroughFlow = PassThroughFlow(processingFlow, Keep.both)

  val future = testSource.via(passThroughFlow).runWith(Sink.seq)

我希望处理流程能够根据其输入重新排序其输出,我将得到如下结果:

代码语言:javascript
复制
[(30,1), (40,2),(10,3),(10,4), ...]

在右边(传递的内容总是有序的),但是左边,通过我的mapAsyncUnordered,可能会加入一个不正确的元素,从而形成一个糟糕的元组。

实际上,我得到的是:

代码语言:javascript
复制
[(10,1), (20,2),(30,3),(40,4), ...]

每次都是这样。经过进一步的研究,我注意到代码运行缓慢,实际上它根本没有并行运行,尽管我的map异步无序。我尝试在前后引入一个缓冲区,以及一个异步边界,但它似乎总是按顺序运行。这解释了为什么它总是有序的,但我希望我的处理流程具有更高的吞吐量。

我做了以下的工作:

代码语言:javascript
复制
object PassThroughFlow {

  def keepRight[A, A1](processingFlow: Flow[A, A1, NotUsed]): Flow[A, A, NotUsed] =
    keepBoth[A, A1](processingFlow).map(_._2)

  def keepBoth[A, A1](processingFlow: Flow[A, A1, NotUsed]): Flow[A, (A1, A), NotUsed] =
    Flow.fromGraph(GraphDSL.create() { implicit builder => {
      import GraphDSL.Implicits._

      val broadcast = builder.add(Broadcast[A](2))
      val zip = builder.add(ZipWith[A1, A, (A1, A)]((left, right) => (left, right)))

      broadcast.out(0) ~> processingFlow ~> zip.in0
      broadcast.out(1) ~> zip.in1

      FlowShape(broadcast.in, zip.out)
    }
    })
}

object ParallelPassThroughFlow {


  def keepRight[A, A1](parallelism: Int, processingFlow: Flow[A, A1, NotUsed]): Flow[A, A, NotUsed] =
    keepBoth(parallelism, processingFlow).map(_._2)

  def keepBoth[A, A1](parallelism: Int, processingFlow: Flow[A, A1, NotUsed]): Flow[A, (A1, A), NotUsed] = {
    Flow.fromGraph(GraphDSL.create() { implicit builder =>
      import GraphDSL.Implicits._

      val fanOut = builder.add(Balance[A](outputPorts = parallelism))
      val merger = builder.add(Merge[(A1, A)](inputPorts = parallelism, eagerComplete = false))

      Range(0, parallelism).foreach { n =>
        val passThrough = PassThroughFlow.keepBoth(processingFlow)
        fanOut.out(n) ~> passThrough ~> merger.in(n)
      }

      FlowShape(fanOut.in, merger.out)
    })
  }

}

两个问题:

  1. 原始实施中,为什么通过流中的zip限制映射异步无序的并行性?
  2. 我的作品是声音的,还是可以改进的?我基本上把我的输入输入到多个通过流的堆栈中,并将它们合并起来。它似乎有我想要的属性(并行但保持顺序,即使处理流程重新排序),但是有些东西感觉不对
EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2019-03-20 20:25:47

您所看到的行为是broadcastzip如何工作的结果:broadcast在其所有输出信号需求时向下游发出信号;zip在信令需求之前等待其所有输入(并向下游发射)。

代码语言:javascript
复制
broadcast.out(0) ~> processingFlow ~> zip.in0
broadcast.out(1) ~> zip.in1

考虑第一个元素(1)通过上面的图的移动。1广播给processingFlowzipzip立即接收到它的一个输入(1),并等待它的另一个输入(10),这将需要稍长的时间才能到达。只有当zip同时获得110时,它才会从上游提取更多的元素,从而触发第二个元素(2)在流中的移动。诸若此类。

至于你的ParallelPassThroughFlow,我不知道为什么你“感觉不对劲”。

票数 4
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/55219937

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档