我正在学习Akka,并试图理解GraphStage的工作原理,但我正在为物化价值的使用而奋斗。
我想帮助解决以下问题:
如何消费第一流到第二流的物化价值?
如果有的话,请提供指针。
谢谢
发布于 2018-01-03 14:12:38
我花了一些时间才把你的问题想清楚。简而言之,我的回答是:根据我对阿克卡流的理解,你的问题是毫无意义的。下面是一个扩展的解释。此外,我的知识是有限的,所以如果我有错误的假设-请评论,让我们一起学习。
在考虑如何消费Flow的物化值之前,我们首先需要有一个Flow示例,它的物化值与NotUsed不同--后者用于大多数开箱即用的流。因此,我编写了一个简单的Flow,它只通过所有上游元素,但有一个计数器嵌入,它计算传递的总元素。下面是代码:
class FlowWithCounter extends GraphStageWithMaterializedValue[FlowShape[Int, Int], Future[Int]]{
val in:Inlet[Int] = Inlet("Inlet_of_FlowWithCounter")
val out:Outlet[Int] = Outlet("Outlet of FlowWithCounter")
override def shape: FlowShape[Int, Int] = FlowShape(in, out)
override def createLogicAndMaterializedValue(inheritedAttributes: Attributes): (GraphStageLogic, Future[Int]) = {
val materializedValue = Promise[Int]()
val logic = new GraphStageLogic(shape) {
private var counter: Int = 0
setHandler(
in,
new InHandler {
override def onPush(): Unit = {
val elem = grab(in)
counter += 1
push(out, elem)
}
override def onUpstreamFinish(): Unit = {
materializedValue.success(counter)
super.onUpstreamFinish()
}
}
)
setHandler(out, new OutHandler {
override def onPull(): Unit = {
pull(in)
}
})
}
(logic, materializedValue.future)
}
}如您所见,物化值类型是FutureInt。因此,当流完成时,物化程序将能够获取这个值并使用它,如果我们指示它这样做的话。
现在很少有重要的观察结果:
createLogicAndMaterializedValue方法将不会被流Materializer调用。因此,我们应该在那一刻返回一些指针,它将在流完成时刻保存物化值。这通常是以Future[T]的形式自然完成的。在我们的示例中,这意味着我们只能访问new GraphStageLogic(shape) {...}构造函数中的new GraphStageLogic(shape) {...}变量。我们不能在返回counter对的行中引用(logic, materializedValue.future)本身。
我还没有在DSL中找到任何查询流组件的物化值的方法。只有借助toMat、viaMat等方法,我们才能告诉物化器向下游传播组件的物化值。只有借助run等方法,我们才能最终提取出整个流的最终物化值。
注意到我们的示例流使用counter值完成了在
override def onUpstreamFinish(): Unit = {
materializedValue.success(counter)
super.onUpstreamFinish()
}当上游完成时。
下面是我们如何连接这个示例流,在流阶段传播它的物化值,并在整个流完成时使用它:
import akka.stream.stage._
object Main {
import scala.concurrent._
import akka._
import akka.actor._
import akka.stream._
import akka.stream.scaladsl._
implicit val system = ActorSystem("TestSystem")
implicit val materializer = ActorMaterializer()
import system.dispatcher
class FlowWithCounter extends GraphStageWithMaterializedValue[FlowShape[Int, Int], Future[Int]]{....}
def main(args:Array[String]):Unit = {
val source:Source[Int, NotUsed] = Source(15 to 25)
val flow:Flow[Int, Int, Future[Int]] = Flow.fromGraph(new FlowWithCounter)
val sink:Sink[Int, Future[Done]] = Sink.foreach(println)
val completion:(Future[Int], Future[Done]) = ((source.viaMat(flow)(Keep.right)).toMat(sink)(Keep.both)).run()
completion._2 foreach{_ =>
completion._1 foreach{count =>
println("Our flow processed " + count + " elements.")
system.terminate()
}
}
}
}因此,我们的流程的物化值只有在阶段完成时才可用。Flow组件不知道上游源和下游接收器的物化值。据我所知,只有物化器和流解释器才能获取这些物化值并适当地传播它们。
也许你的问题对无限流有一定的意义,我可以想象第二流需要在某种逻辑条件下检查第一流中嵌入的计数器的情况,但我不知道如何实现。也许有人会用其他答案来解释。
https://stackoverflow.com/questions/48057717
复制相似问题