如何处理异步执行任务期间的失败?也就是说,至少打印堆栈跟踪并关闭。下面的代码似乎永远等待输入>5。
val things = Range(1, 40)
implicit val scheduler = monix.execution.Scheduler.global
def t(i:Int) = Task.eval {
Try{
Thread.sleep(1000)
val result = i + 1
if(result > 5){
throw new Exception("asdf")
}
// i.e. write to file, that's why unit is returned
println(result) // Effect
"Result"
}
}
val futures = things.map(e=> t(e))
futures.foreach(_.runToFuture)编辑
试着:
futures.foreach(_.runToFuture.onComplete {
case Success(value) =>
println(value)
case Failure(ex) =>
System.err.println(ex)
System.exit(1)
})不会停止计算。如何记录堆栈跟踪并取消正在进行的计算并停止?
发布于 2019-11-16 20:57:32
更惯用的方法是使用Observable而不是Task,因为它处理的是数据列表(我假设这是用例,因为它在问题中显示了)。
val obs = Observable
.fromIterable(Range(1, 40))
.mapEval(i =>
if (i + 1 > 5) Task.raiseError(new Exception("Error")) // will stop the stream
else Task.delay(println(i)) // Or write to file in your case
)
.completedL
.runToFuture
obs
.recover {
case NonFatal(e) => println("Error")
}或者,您也可以使用Either发出错误信号,这将导致更好的类型安全性,因为您需要处理Either结果。
val obs = Observable
.fromIterable(Range(1, 40))
.mapEval(i =>
if (i + 1 > 5) Task.pure(Left("Error"))
else Task.delay(println(i)).map(_ => Right(())) // Or write to file in your case
)
.takeWhileInclusive(_.isRight) // will also emit the failing result
.lastL
.runToFuture
obs.map {
case Left(err) => println("There's an error")
case _ => println("Completed successfully")
}发布于 2019-10-22 00:45:06
这个问题有两个部分:
使任务可取消
Monix有BooleanCancelable,它允许您在调用isCancelled时将cancel的结果设置为true。
cancel还需要在Thread.interrupt运行时调用Thread.interrupt来唤醒它。否则,sleep将通过它的过程。但是,这将在任务中抛出InterruptedException。这需要处理。
取消兄弟姐妹
有CompositeCancelable。它似乎是CompositeCancellable的用例,它用于从父任务调用cancel。因此,一旦CompositeCancellable建成(即。(所有任务都是构造的):
cancel。(请注意,这是一种循环引用,最好避免)cancel时,会通知父任务(或代码)。(这将避免循环引用)通知同级任务的另一种方法是使用AtomicBoolean并经常检查它(睡眠10毫秒而不是1000毫秒)。当一个任务失败时,它将设置这个布尔值,以便其他任务可以停止它们的执行。当然,这不涉及Cancellable。(这是一种黑客,最好使用monix调度程序)
备注
在Thread.sleep中调用Task是个好主意吗?我认为这将阻止另一项任务使用该线程。我认为使用调度器来添加延迟和组合这些子任务是最有效地利用线程池的方法。
https://stackoverflow.com/questions/58470766
复制相似问题