首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >将来自elastic4s订户的响应发送回akka-stream

将来自elastic4s订户的响应发送回akka-stream
EN

Stack Overflow用户
提问于 2016-07-25 18:33:32
回答 1查看 348关注 0票数 0

我目前正在构建一个从mongoDb到elasticsearch的数据流解决方案。我的目标是跟踪所有成功传输到elasticsearch的项目。我正在使用akka-streams和elastic4s。目前,进入es的流如下所示

代码语言:javascript
复制
val esSubscriber: BulkIndexingSubscriber[CustomT] = esClient.subscriber[CustomT](
    batchSize = batchSize,
    completionFn = { () => elasticFinishPromise.success(()); ()},
    errorFn = { (t: Throwable) => elasticFinishPromise.failure(t); ()},
    concurrentRequests = concurrentRequests
    )
val esSink: Sink[CustomT, NotUsed] = Sink.fromSubscriber(esSubscriber)

从我的来源看,大概是这样的:

代码语言:javascript
复制
val a: [NotUsed] = mongoSrc
  .via(some operations..)
  .to(esSink)
  .run()

现在一切正常,现在我正在用第二个接收器记录项目计数。但我更愿意记录真正传输到elasticsearch的项目。elastic4s订阅者提供了一个带有onAck(): UnitonFailure(): Unitlistener: ResponseListener,我希望像这样将此信息返回到流中

代码语言:javascript
复制
val mongoSrc: [Source..]
val doStuff: [Flow..]
val esSink: [Flow..] //now as flow instead of sink
val logSink: [Sink[Int...]] //now gets for example a 1 for each successful transported item

mongoSrc ~> doStuff ~> esSink ~> logSink

我该如何实现它呢?我需要一个自定义的stage来缓冲onAckonFailure的元素吗?还是有一种更简单的方法?

谢谢你的帮助。

EN

回答 1

Stack Overflow用户

发布于 2016-07-31 01:38:33

你可以通过利用Flow.fromSinkAndSource来“漂浮”你的Subscriber[T]接收器。查看来自the docs的“复合流(来自汇点和源)”插图。

在本例中,您将附加自定义actorPublisher作为源,并从onAck()向其发送消息。

既然你想要一个更简单的方法:

代码语言:javascript
复制
val doStuff = Flow[DocToIndex]
                .grouped(batchSize)
                .mapAsync(concurrentRequests)(bulkopFuture)

简而言之,抛开所有有用的抽象概念不谈,elastic4s订阅者就是a bulk update request

票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/38565419

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档