使用来自superPool的akka-http,我有一个传递元组的流。我想把它输到Alpakka Google Pub/Sub连接器。在HTTP处理结束时,我对发布/子连接器的所有内容进行编码,并以
(PublishRequest, Long) // long is a timestamp但是连接器的接口是
Flow[PublishRequest, Seq[String], NotUsed]第一种方法是杀死一部分:
.map{ case(publishRequest, timestamp) => publishRequest }
.via(publishFlow)在保留Long信息的同时,是否有一种创建该管道的优雅方法?
编辑:在答案中添加了我不那么优雅的解决方案。更多答案欢迎。
发布于 2018-07-14 15:56:18
我不太优雅的解决方案是使用自定义流来重新组合事物:
val publishAndRecombine = Flow.fromGraph(GraphDSL.create() { implicit b =>
val bc = b.add(Broadcast[(PublishRequest, Long)](2))
val publisher = b.add(Flow[(PublishRequest, Long)]
.map { case (pr, _) => pr }
.via(publishFlow))
val zipper = b.add(Zip[Seq[String], Long]).
bc.out(0) ~> publisher ~> zipper.in0
bc.out(1).map { case (pr, long) => long } ~> zipper.in1
FlowShape(bc.in, zipper.out)
})发布于 2018-07-15 00:58:12
我没有发现使用GraphDSL.create()的解决方案有什么不雅之处,我认为它具有通过图表~>子句可视化流结构的优点。我看到你的代码有问题了。例如,我认为publisher不应该由add-ing定义--一个流向构建器的流。
下面是我认为publishAndRecombine应该是什么样的一个骨架版本(经过了简要测试):
val publishFlow: Flow[PublishRequest, Seq[String], NotUsed] = ???
val publishAndRecombine = Flow.fromGraph(GraphDSL.create() { implicit b =>
import GraphDSL.Implicits._
val bcast = b.add(Broadcast[(PublishRequest, Long)](2))
val zipper = b.add(Zip[Seq[String], Long])
val publisher = Flow[(PublishRequest, Long)].
map{ case (pr, _) => pr }.
via(publishFlow)
val timestamp = Flow[(PublishRequest, Long)].
map{ case (_, ts) => ts }
bcast.out(0) ~> publisher ~> zipper.in0
bcast.out(1) ~> timestamp ~> zipper.in1
FlowShape(bcast.in, zipper.out)
})发布于 2022-03-02 22:41:48
现在有一个更好的解决方案,将在Akka 2.6.19中发布(参见https://github.com/akka/akka/pull/31123)。
为了使用已格式化的unsafeViaData,您首先必须使用FlowWithContext/SourceWithContext来表示(PublishRequest, Long)。FlowWithContext/SourceWithContext是专门为解决这个问题而设计的抽象(参见https://doc.akka.io/docs/akka/current/stream/stream-context.html)。问题是,您有一个数据部分的流,这通常是您想要操作的部分(在您的例子中是ByteString),然后是上下文(也称为元数据)部分,您通常只是传递未经修改的部分(在您的例子中是Long)。
所以最终你会得到这样的东西
val myFlow: FlowWithContext[PublishRequest, Long, PublishRequest, Long, NotUsed] =
FlowWithContext.fromTuples(originalFlowAsTuple) // Original flow that has `(PublishRequest, Long)` as an output
myFlow.unsafeViaData(publishFlow)与阿克卡流,打破元组项目分开?不同的是,这个解决方案不仅涉及更少的样板,因为它的部分akka,但它也保留了物化的价值,而不是失去它,总是以一个NotUsed结束。
对于那些想知道为什么unsafeViaData方法的名称中有unsafe的人来说,这是因为您传递给该方法的Flow不能添加、删除或重新排序流中的任何元素(这样做意味着上下文不再正确地对应于流的数据部分)。理想情况下,我们将使用Scala的类型系统在编译时捕获此类错误,但这样做将需要对akka流进行大量更改,特别是如果更改需要保持向后兼容性(在处理akka时是这样做的)。更多细节见前面提到的公关。
https://stackoverflow.com/questions/51341038
复制相似问题