首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >阿克卡流,打破元组项目分开?

阿克卡流,打破元组项目分开?
EN

Stack Overflow用户
提问于 2018-07-14 15:54:43
回答 3查看 403关注 0票数 1

使用来自superPoolakka-http,我有一个传递元组的流。我想把它输到Alpakka Google Pub/Sub连接器。在HTTP处理结束时,我对发布/子连接器的所有内容进行编码,并以

代码语言:javascript
复制
(PublishRequest, Long) // long is a timestamp

但是连接器的接口是

代码语言:javascript
复制
Flow[PublishRequest, Seq[String], NotUsed]

第一种方法是杀死一部分:

代码语言:javascript
复制
  .map{ case(publishRequest, timestamp) => publishRequest }
  .via(publishFlow)

在保留Long信息的同时,是否有一种创建该管道的优雅方法?

编辑:在答案中添加了我不那么优雅的解决方案。更多答案欢迎。

EN

回答 3

Stack Overflow用户

回答已采纳

发布于 2018-07-14 15:56:18

我不太优雅的解决方案是使用自定义流来重新组合事物:

代码语言:javascript
复制
val publishAndRecombine = Flow.fromGraph(GraphDSL.create() { implicit b =>

  val bc = b.add(Broadcast[(PublishRequest, Long)](2))
  val publisher = b.add(Flow[(PublishRequest, Long)]
    .map { case (pr, _) => pr }
    .via(publishFlow))
  val zipper = b.add(Zip[Seq[String], Long]). 

  bc.out(0) ~> publisher ~> zipper.in0
  bc.out(1).map { case (pr, long) => long } ~> zipper.in1

  FlowShape(bc.in, zipper.out)

})
票数 0
EN

Stack Overflow用户

发布于 2018-07-15 00:58:12

我没有发现使用GraphDSL.create()的解决方案有什么不雅之处,我认为它具有通过图表~>子句可视化流结构的优点。我看到你的代码有问题了。例如,我认为publisher不应该由add-ing定义--一个流向构建器的流。

下面是我认为publishAndRecombine应该是什么样的一个骨架版本(经过了简要测试):

代码语言:javascript
复制
val publishFlow: Flow[PublishRequest, Seq[String], NotUsed] = ???

val publishAndRecombine = Flow.fromGraph(GraphDSL.create() { implicit b =>
  import GraphDSL.Implicits._

  val bcast = b.add(Broadcast[(PublishRequest, Long)](2))
  val zipper = b.add(Zip[Seq[String], Long])

  val publisher = Flow[(PublishRequest, Long)].
    map{ case (pr, _) => pr }.
    via(publishFlow)
  val timestamp = Flow[(PublishRequest, Long)].
    map{ case (_, ts) => ts }

  bcast.out(0) ~> publisher ~> zipper.in0
  bcast.out(1) ~> timestamp ~> zipper.in1

  FlowShape(bcast.in, zipper.out)
})
票数 2
EN

Stack Overflow用户

发布于 2022-03-02 22:41:48

现在有一个更好的解决方案,将在Akka 2.6.19中发布(参见https://github.com/akka/akka/pull/31123)。

为了使用已格式化的unsafeViaData,您首先必须使用FlowWithContext/SourceWithContext来表示(PublishRequest, Long)FlowWithContext/SourceWithContext是专门为解决这个问题而设计的抽象(参见https://doc.akka.io/docs/akka/current/stream/stream-context.html)。问题是,您有一个数据部分的流,这通常是您想要操作的部分(在您的例子中是ByteString),然后是上下文(也称为元数据)部分,您通常只是传递未经修改的部分(在您的例子中是Long)。

所以最终你会得到这样的东西

代码语言:javascript
复制
val myFlow: FlowWithContext[PublishRequest, Long, PublishRequest, Long, NotUsed] =
    FlowWithContext.fromTuples(originalFlowAsTuple) // Original flow that has `(PublishRequest, Long)` as an output

myFlow.unsafeViaData(publishFlow)

阿克卡流,打破元组项目分开?不同的是,这个解决方案不仅涉及更少的样板,因为它的部分akka,但它也保留了物化的价值,而不是失去它,总是以一个NotUsed结束。

对于那些想知道为什么unsafeViaData方法的名称中有unsafe的人来说,这是因为您传递给该方法的Flow不能添加、删除或重新排序流中的任何元素(这样做意味着上下文不再正确地对应于流的数据部分)。理想情况下,我们将使用Scala的类型系统在编译时捕获此类错误,但这样做将需要对akka流进行大量更改,特别是如果更改需要保持向后兼容性(在处理akka时是这样做的)。更多细节见前面提到的公关。

票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/51341038

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档