首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >有没有可能用Akka-stream创建一个流,可以在两个不同的内部形状之间切换?

有没有可能用Akka-stream创建一个流,可以在两个不同的内部形状之间切换?
EN

Stack Overflow用户
提问于 2020-12-03 21:47:49
回答 1查看 117关注 0票数 0

我想有一个复杂的Flow,我可以在2个不同的Shapes之间切换,这取决于流入图形的数据。当我们返回一个ClosedShape时,这个图是静态的,但是当我们返回FlowShape时,我想知道是否有可能在它内部创建某种动态流。我看着this question,似乎他们使用了一个Partition,我不知道如何应用它,或者它是否真的解决了我的问题。

我从这个例子开始,我被代码中的注释卡住了。

代码语言:javascript
复制
import akka.actor.ActorSystem
import akka.stream.FlowShape
import akka.stream.scaladsl.{Flow, GraphDSL, Sink, Source}

import scala.concurrent.duration._

object StreamOpenGraphsWithMultipleFlows extends App {

  run()

  def run() = {
    implicit val system = ActorSystem("StreamOpenGraphsWithMultipleFlows")

    val fastSource = Source(1 to 1000).throttle(50, 1 second)
    val slowSource = Source(1 to 1000).throttle(5, 1 second)
    val INC = 5
    val MULTI = 10
    val DIVIDE = 2

    val incrementer = Flow[Int].map { x =>
      val result = x + INC
      print(s" | incrementing $x + $INC -> $result")
      result
    }
    val multiplier = Flow[Int].map { x =>
      val result = x * MULTI
      print(s" | multiplying $x * $MULTI -> $result")
      result
    }
    val divider = Flow[Int].map { x =>
      val result = x / DIVIDE
      print(s" | dividing $x / $DIVIDE -> $result")
      result
    }

    def isMultipleOf(value: Int, multiple: Int): Boolean = (value % multiple) == 0

    // Step 1 - setting up the fundamental for a stream graph
    val complexFlowIncrementer = Flow.fromGraph(
      GraphDSL.create() { implicit builder =>
        import GraphDSL.Implicits._
        // Step 2 - add necessary components of this graph
        val incrementerShape = builder.add(incrementer)
        val multiplierShape = builder.add(multiplier)

        // println(s"builder.materializedValue: ${builder.materializedValue}")

        // Step 3 - tying up the components
        incrementerShape ~> multiplierShape
        // BUT I WOULD LIKE TO DO SOMETHING AS BELOW
        // if (isMultipleOf(value???, 10)) incrementerShape ~> divider
        // else incrementerShape ~> multiplierShape

        // Step 4 - return the shape
        FlowShape(incrementerShape.in, multiplierShape.out)
      }
    )
    // run the graph and materialize it
    val graph = slowSource
      .via(complexFlowIncrementer)
      .to(Sink.foreach(x => println(s" | result: $x")))
    graph.run()
  }
}
EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2020-12-03 23:02:29

这个blog post展示了如何实现这一点的代码示例,所以在您的例子中,您需要一些类似的东西:

代码语言:javascript
复制
val complexFlowIncrementer = Flow.fromGraph(
      GraphDSL.create() { implicit builder =>
        import GraphDSL.Implicits._
        // Step 2 - add necessary components of this graph
        val incrementerShape = builder.add(incrementer)
        val multiplierShape = builder.add(multiplier)
        val dividerShape = builder.add(divider)
        
        //add partition and merge
        val partition = builder.add(Partition[Int](2, if(isMultipleOf(_, 10)) 0 else 1)
        val merge = builder.add(Merge[Int](2))

        // println(s"builder.materializedValue: ${builder.materializedValue}")

        // Step 3 - tying up the components
        incrementerShape ~> partition
        partition.out(0) ~> dividerShape ~> merge.in(0)
        partition.out(1) ~> multiplierShape ~> merge.in(1)
       
        // Step 4 - return the shape
        FlowShape(incrementerShape.in, merge.out)
      }
    )
票数 3
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/65127253

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档