我目前正在尝试使用Monix来控制api请求。我试过使用STTP的Monix后端,它工作得很好,直到我完成后无法关闭Monix后端.由于这看起来更像是sttp问题,而不是Monix问题,我试图通过使用sttp的默认后端来重新解决这个问题,同时仍然使用Monix进行节流。
我主要是挣扎于关闭monix后端,一旦我用完了可观察到的
我试图通过以下方式简化这一问题:
import monix.execution.Scheduler.Implicits.global
val someIter = List(Task(1), Task(2))
val obs: Observable[CancelableFuture[Int]] = Observable
.fromIterable(someIter)
.throttle(3.second, 1)
.map(_.runToFuture)但是,我仍然不确定如何在使用可观察到的程序之后关闭程序,因为它在这里过早终止(不像monix后端情况).
换句话说,在可观察的可迭代性完成之前,我如何阻止终止程序?
发布于 2020-08-10 20:16:05
您可以创建Promise,在Observable由.doOnComplete完成时完成它
在主线上等待它。
import monix.execution.Scheduler.Implicits.global
val someIter = List(Task(1), Task(2))
val promise = Promise()
val obs: Observable[CancelableFuture[Int]] = Observable.fromIterable(someIter).throttle(3.second, 1)
.map(_.runToFuture)
.doOnComplete(Task { promise.complete(Success()) })
Await.ready(promise.future, Duration.Inf)发布于 2020-08-12 19:42:56
除了阿特姆接受的答案之外,并有来自Monix Gitter社区的洞察力,另一个潜在的实现可能是:
val someIter = List(Task(1), Task(2))
val obs =
Observable
.fromIterable(someIter)
.throttle(1 second, 10)
.mapParallelUnordered(10)(x => x.map(x => x.send().body)) // Here we send requests
.sumL // Sum just as an example
.runSyncUnsafe()https://stackoverflow.com/questions/63344249
复制相似问题