首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >Monix任务处理期货列表失败

Monix任务处理期货列表失败
EN

Stack Overflow用户
提问于 2019-10-20 07:05:27
回答 2查看 531关注 0票数 1

如何处理异步执行任务期间的失败?也就是说,至少打印堆栈跟踪并关闭。下面的代码似乎永远等待输入>5。

代码语言:javascript
复制
val things = Range(1, 40)
  implicit val scheduler = monix.execution.Scheduler.global
  def t(i:Int) = Task.eval {
      Try{
        Thread.sleep(1000)
        val result = i + 1
        if(result > 5){
          throw new Exception("asdf")
        }
        // i.e. write to file, that's why unit is returned
        println(result) // Effect
        "Result"
      }
    }
    val futures = things.map(e=> t(e))
  futures.foreach(_.runToFuture)

编辑

试着:

代码语言:javascript
复制
futures.foreach(_.runToFuture.onComplete {
    case Success(value) =>
      println(value)
    case Failure(ex) =>
      System.err.println(ex)
      System.exit(1)
  })

不会停止计算。如何记录堆栈跟踪并取消正在进行的计算并停止?

EN

回答 2

Stack Overflow用户

回答已采纳

发布于 2019-11-16 20:57:32

更惯用的方法是使用Observable而不是Task,因为它处理的是数据列表(我假设这是用例,因为它在问题中显示了)。

代码语言:javascript
复制
 val obs = Observable
  .fromIterable(Range(1, 40))
  .mapEval(i =>
    if (i + 1 > 5) Task.raiseError(new Exception("Error")) // will stop the stream
    else Task.delay(println(i)) // Or write to file in your case
  )
  .completedL
  .runToFuture


obs
  .recover {
    case NonFatal(e) => println("Error")
  }

或者,您也可以使用Either发出错误信号,这将导致更好的类型安全性,因为您需要处理Either结果。

代码语言:javascript
复制
val obs = Observable
  .fromIterable(Range(1, 40))
  .mapEval(i =>
    if (i + 1 > 5) Task.pure(Left("Error"))
    else Task.delay(println(i)).map(_ => Right(())) // Or write to file in your case
  )
  .takeWhileInclusive(_.isRight) // will also emit the failing result
  .lastL
  .runToFuture


obs.map {
  case Left(err) => println("There's an error")
  case _ => println("Completed successfully")
}
票数 1
EN

Stack Overflow用户

发布于 2019-10-22 00:45:06

这个问题有两个部分:

  • 使任务可取消。
  • 当一个任务失败时取消兄弟姐妹。

使任务可取消

Monix有BooleanCancelable,它允许您在调用isCancelled时将cancel的结果设置为true

cancel还需要在Thread.interrupt运行时调用Thread.interrupt来唤醒它。否则,sleep将通过它的过程。但是,这将在任务中抛出InterruptedException。这需要处理。

取消兄弟姐妹

CompositeCancelable。它似乎是CompositeCancellable的用例,它用于从父任务调用cancel。因此,一旦CompositeCancellable建成(即。(所有任务都是构造的):

  • 必须为每个任务提供对此的引用,这样失败的任务就可以对其调用cancel。(请注意,这是一种循环引用,最好避免)
  • 当子任务失败并调用cancel时,会通知父任务(或代码)。(这将避免循环引用)

通知同级任务的另一种方法是使用AtomicBoolean并经常检查它(睡眠10毫秒而不是1000毫秒)。当一个任务失败时,它将设置这个布尔值,以便其他任务可以停止它们的执行。当然,这不涉及Cancellable。(这是一种黑客,最好使用monix调度程序)

备注

Thread.sleep中调用Task是个好主意吗?我认为这将阻止另一项任务使用该线程。我认为使用调度器来添加延迟和组合这些子任务是最有效地利用线程池的方法。

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/58470766

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档