我使用图形dsl根据我看到的一些示例代码创建了一些流处理作业。一切运行良好,我只是很难理解表示法:(更新为2.4)
def elements: Source[Foos] = ...
def logEveryNSink = // a sink that logs
def cleaner: Flow[Foos, Bars, Unit] = ...
def boolChecker(bar: Bar)(implicit ex: ExecutionContext): Future[Boolean] = ...
val mySink = Sink.foreach[Boolean](println(_))
val lastly = Flow[Bars].mapAsync(2)(x => boolChecker(x).toMat(mySink)(Keep.right)
val materialized = RunnableGraph.fromGraph(
GraphDSL.create(lastly) { implicit builder =>
baz => {
import GraphDSL.Implicits._
val broadcast1 = builder.add(Broadcast[Foos](2))
val broadcast2 = builder.add(Broadcast[Bars](2))
elements ~> broadcast1 ~> logEveryNSink(1)
broadcast1 ~> cleaner ~> broadcast2 ~> baz
~> broadcast2 ~> logEveryNSink(1)
ClosedShape
}
}
).run()我理解包含的隐式构建器,但我不确定baz在{ implicit builder => baz => { ...中代表什么。它只是整个形状的隐式名称吗?
发布于 2016-02-29 13:10:27
GraphDSL.create方法被重载以接受许多输入形状的变体(包括0)。如果您没有传递初始形状,那么buildBlock函数arg的签名(您实际上定义了如何构建图形的主体)如下:
(Builder[NotUsed]) => S这就是一个简单的Function1[Builder[NotUsed], S],也就是一个函数,它接受一个Builder[NotUsed]的实例,并返回一个Shape实例,这是一个最终的图形。这里的NotUsed是Unit的同义词,因为您是说,通过不传递任何输入共享,您不关心正在生成的输出图的物化值。
如果您确实决定传递输入形状,那么该buildBlock函数的签名就会发生一些变化,以容纳输入形状。在您的示例中,您传递的是一个输入形状,因此buildBlock的签名更改为:
(Builder[Mat]) => Graph.Shape => S现在,这实际上是一个Function1[Builder[Mat], Function1[Graph.Shape, S]],或者一个接受Builder[Mat] ( Mat是输入形状的物化值类型)并返回接受Graph.Shape并返回S实例(即Shape)的函数。
长话短说,如果您传递形状,那么还需要将它们声明为图构建块函数上的绑定参数,而是作为第二个输入函数(因此是附加的=>)。
https://stackoverflow.com/questions/35691116
复制相似问题