在RxJava 1/ RxScala中,如何在以下情况下观察到节气门/背压源?
def fast: Observable[Foo] // Supports backpressure
def afterExpensiveOp: Observable[Bar] =
fast.flatMap(foo => Observable.from(expensiveOp(foo))
// Signature and behavior is out of my control
def expensiveOp(foo: Foo)(implicit ec: ExecutionContext): Future[Bar] = {
if(noResources()) Future.failed(new OutOfResourcesException())
else Future { Bar() }
}一个可能的解决办法就是阻止直到。它可以工作,但这是非常不优雅的,可以防止多个同时进行的请求:
def afterExpensiveOp: Observable[Bar] = fast.flatMap(foo =>
Observable.just(Observable.from(expensiveOp(foo)).toBlocking.head)
)发布于 2016-11-15 09:25:37
flatMap有一个参数来限制并发订阅者的数量。如果你使用这个,flatMap会照顾你的背压。
def afterExpensiveOp = fast.flatMap(safeNumberOfConccurrentExpensiveOps, x => Observable.from(expensiveOp(x)))https://stackoverflow.com/questions/40510116
复制相似问题