首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >如何使用Akka流将Flow1物化值传递和使用到Flow2

如何使用Akka流将Flow1物化值传递和使用到Flow2
EN

Stack Overflow用户
提问于 2018-01-02 08:23:41
回答 1查看 977关注 0票数 1

我正在学习Akka,并试图理解GraphStage的工作原理,但我正在为物化价值的使用而奋斗。

我想帮助解决以下问题:

如何消费第一流到第二流的物化价值?

如果有的话,请提供指针。

谢谢

EN

回答 1

Stack Overflow用户

发布于 2018-01-03 14:12:38

我花了一些时间才把你的问题想清楚。简而言之,我的回答是:根据我对阿克卡流的理解,你的问题是毫无意义的。下面是一个扩展的解释。此外,我的知识是有限的,所以如果我有错误的假设-请评论,让我们一起学习。

在考虑如何消费Flow的物化值之前,我们首先需要有一个Flow示例,它的物化值与NotUsed不同--后者用于大多数开箱即用的流。因此,我编写了一个简单的Flow,它只通过所有上游元素,但有一个计数器嵌入,它计算传递的总元素。下面是代码:

代码语言:javascript
复制
class FlowWithCounter extends GraphStageWithMaterializedValue[FlowShape[Int, Int], Future[Int]]{
    val in:Inlet[Int] = Inlet("Inlet_of_FlowWithCounter")
    val out:Outlet[Int] = Outlet("Outlet of FlowWithCounter")

    override def shape: FlowShape[Int, Int] = FlowShape(in, out)

    override def createLogicAndMaterializedValue(inheritedAttributes: Attributes): (GraphStageLogic, Future[Int]) = {
      val materializedValue = Promise[Int]()
      val logic = new GraphStageLogic(shape) {
        private var counter: Int = 0

        setHandler(
          in,
          new InHandler {
            override def onPush(): Unit = {
              val elem = grab(in)
              counter += 1
              push(out, elem)
            }

            override def onUpstreamFinish(): Unit = {
              materializedValue.success(counter)
              super.onUpstreamFinish()
            }
          }
        )

        setHandler(out, new OutHandler {
          override def onPull(): Unit = {
            pull(in)
          }
        })
      }
      (logic, materializedValue.future)
    }
  }

如您所见,物化值类型是FutureInt。因此,当流完成时,物化程序将能够获取这个值并使用它,如果我们指示它这样做的话。

现在很少有重要的观察结果:

  1. 当我们运行流时,createLogicAndMaterializedValue方法将不会被流Materializer调用。因此,我们应该在那一刻返回一些指针,它将在流完成时刻保存物化值。这通常是以Future[T]的形式自然完成的。
  2. 正如所指出的,这里: //所有状态必须在GraphStageLogic中, //绝不在封闭的GraphStage中。 //此状态可以安全地访问和修改 //回调,由GraphStageLogic和 //已登记的处理程序。

在我们的示例中,这意味着我们只能访问new GraphStageLogic(shape) {...}构造函数中的new GraphStageLogic(shape) {...}变量。我们不能在返回counter对的行中引用(logic, materializedValue.future)本身。

我还没有在DSL中找到任何查询流组件的物化值的方法。只有借助toMatviaMat等方法,我们才能告诉物化器向下游传播组件的物化值。只有借助run等方法,我们才能最终提取出整个流的最终物化值。

注意到我们的示例流使用counter值完成了在

代码语言:javascript
复制
override def onUpstreamFinish(): Unit = {
  materializedValue.success(counter)
  super.onUpstreamFinish()
}

当上游完成时。

下面是我们如何连接这个示例流,在流阶段传播它的物化值,并在整个流完成时使用它:

代码语言:javascript
复制
import akka.stream.stage._

object Main {
  import scala.concurrent._
  import akka._
  import akka.actor._
  import akka.stream._
  import akka.stream.scaladsl._

  implicit val system = ActorSystem("TestSystem")
  implicit val materializer = ActorMaterializer()
  import system.dispatcher

  class FlowWithCounter extends GraphStageWithMaterializedValue[FlowShape[Int, Int], Future[Int]]{....}

  def main(args:Array[String]):Unit = {
    val source:Source[Int, NotUsed] = Source(15 to 25)
    val flow:Flow[Int, Int, Future[Int]] = Flow.fromGraph(new FlowWithCounter)
    val sink:Sink[Int, Future[Done]] = Sink.foreach(println)

    val completion:(Future[Int], Future[Done]) = ((source.viaMat(flow)(Keep.right)).toMat(sink)(Keep.both)).run()
    completion._2 foreach{_ =>
      completion._1 foreach{count =>
        println("Our flow processed " + count + " elements.")
        system.terminate()
      }
    }
  }

}

因此,我们的流程的物化值只有在阶段完成时才可用。Flow组件不知道上游源和下游接收器的物化值。据我所知,只有物化器和流解释器才能获取这些物化值并适当地传播它们。

也许你的问题对无限流有一定的意义,我可以想象第二流需要在某种逻辑条件下检查第一流中嵌入的计数器的情况,但我不知道如何实现。也许有人会用其他答案来解释。

票数 2
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/48057717

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档