首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >限制使用Monix可观测的.onErrorRestartIf时的重试次数?

限制使用Monix可观测的.onErrorRestartIf时的重试次数?
EN

Stack Overflow用户
提问于 2017-07-10 14:23:00
回答 2查看 379关注 0票数 0

Monix可观察到的apis有.onErrorRestartIf(f: Throwable => Boolean).onErrorRestart(times: Int)。如何指定应该重试执行.onErrorRestartIf的最大次数?

EN

回答 2

Stack Overflow用户

回答已采纳

发布于 2017-07-10 17:54:08

您可以基于onErrorHandleWith构建自己的循环

代码语言:javascript
复制
def retryLimited[A](fa: Observable[A], maxRetries: Int)
  (p: Throwable => Boolean): Observable[A] = {

  // If we have no retries left, return the source untouched
  if (maxRetries <= 0) fa else
    fa.onErrorHandleWith { err =>
      // If predicate holds, do recursive call
      if (p(err)) 
        retryLimited(fa, maxRetries - 1)(p)
      else 
        Observable.raiseError(err)
    }
}

如果您不喜欢简单的函数(我喜欢),您可以公开一些扩展方法作为替代:

代码语言:javascript
复制
implicit class ObservableExtensions[A](val self: Observable[A]) 
  extends AnyVal {

  def onErrorRetryLimited(maxRetries: Int)
    (p: Throwable => Boolean): Observable[A] = 
    retryLimited(self, maxRetries)(p)
}

注意,@JVS的回答在精神上是可以的,但可能会有问题,因为它保持共享的可变状态,这对于寒冷的可观测值来说是不行的。因此,请注意,如果您这样做会发生什么:

代码语言:javascript
复制
val source = Observable.suspend { 
  if (Random.nextInt() % 10 != 0)
    Observable.raiseError(new RuntimeException("dummy"))
  else
    Observable(1, 2, 3)
} 

val listT = source
  .onErrorRestartIf(limitedRetries(AtomicInt(maxRetries), shouldRestart))
  .toListL

listT.runAsync // OK
listT.runAsync // Ooops, shared state, we might not have retries left

注意可观测算子中可变的共享状态。当然,你可以这样工作,但你必须意识到其中的危险:)

票数 3
EN

Stack Overflow用户

发布于 2017-07-10 14:23:00

警告:这使用共享的可变状态,对于可观察的冷值可能是不正确的。见Alexandru的回答。

定义一个函数来完成它:

代码语言:javascript
复制
def limitedRetries(maxRetries: AtomicInt, shouldRetryOnException: Throwable => Boolean): Throwable => Boolean = 
  ex => maxRetries.decrementAndGet() > 0 && shouldRetryOnException(ex)

并在onErrorRestartIf中使用此函数

代码语言:javascript
复制
.onErrorRestartIf(limitedRetries(AtomicInt(maxRetries), shouldRestart))

FYI,这里用的是单体AtomicInt .

代码语言:javascript
复制
import monix.execution.atomic.AtomicInt
票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/45014599

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档