我目前正在构建一个从mongoDb到elasticsearch的数据流解决方案。我的目标是跟踪所有成功传输到elasticsearch的项目。我正在使用akka-streams和elastic4s。目前,进入es的流如下所示
val esSubscriber: BulkIndexingSubscriber[CustomT] = esClient.subscriber[CustomT](
batchSize = batchSize,
completionFn = { () => elasticFinishPromise.success(()); ()},
errorFn = { (t: Throwable) => elasticFinishPromise.failure(t); ()},
concurrentRequests = concurrentRequests
)
val esSink: Sink[CustomT, NotUsed] = Sink.fromSubscriber(esSubscriber)从我的来源看,大概是这样的:
val a: [NotUsed] = mongoSrc
.via(some operations..)
.to(esSink)
.run()现在一切正常,现在我正在用第二个接收器记录项目计数。但我更愿意记录真正传输到elasticsearch的项目。elastic4s订阅者提供了一个带有onAck(): Unit和onFailure(): Unit的listener: ResponseListener,我希望像这样将此信息返回到流中
val mongoSrc: [Source..]
val doStuff: [Flow..]
val esSink: [Flow..] //now as flow instead of sink
val logSink: [Sink[Int...]] //now gets for example a 1 for each successful transported item
mongoSrc ~> doStuff ~> esSink ~> logSink我该如何实现它呢?我需要一个自定义的stage来缓冲onAck和onFailure的元素吗?还是有一种更简单的方法?
谢谢你的帮助。
发布于 2016-07-31 01:38:33
你可以通过利用Flow.fromSinkAndSource来“漂浮”你的Subscriber[T]接收器。查看来自the docs的“复合流(来自汇点和源)”插图。
在本例中,您将附加自定义actorPublisher作为源,并从onAck()向其发送消息。
既然你想要一个更简单的方法:
val doStuff = Flow[DocToIndex]
.grouped(batchSize)
.mapAsync(concurrentRequests)(bulkopFuture)简而言之,抛开所有有用的抽象概念不谈,elastic4s订阅者就是a bulk update request。
https://stackoverflow.com/questions/38565419
复制相似问题