我正在使用scala futures异步提交上千个作业。我还实现了一个由并发阻塞队列支持的ThrottledExecutionContext,因此它一次最多只运行100个作业,并将其余作业放入队列中。这是一个阻塞操作,因为它涉及调用自身的第三方服务。当其中一个抛出异常时,我需要重试整个操作(数千个作业)或跳过整个批处理。当一些期货仍在运行时,我不能重试。我有一种方法可以知道在任何给定的时间点有多少作业在第三方系统(spark)中运行。因此,一旦我捕捉到异常,我想首先杀死所有剩余的期货,清空队列,等待第三方完成该批处理的所有未决作业,然后重试。那么有没有办法在一个异常的情况下杀死所有的期货呢?
根据下面的讨论,我尝试了failFast,但它并没有做我期望的事情。我对Promise还没有更好的理解。但是似乎我们可以用Promise来控制Future的未来!
Scala Future/Promise fast-fail pipeline
var atomicnt = new AtomicInteger() // to track how many jobs were finished when exception occured
def failFast[T](futures: Seq[Future[T]]): Future[Seq[T]] = {
val promise = Promise[Seq[T]]
futures.foreach{f => f.onFailure{case ex => promise.failure(ex)}}
val res = Future.sequence(futures)
promise.completeWith(res).future
}
def normalTask() = {
println("Starting normaltask")
Thread.sleep(2000 + Random.nextInt(5000))
if(Random.nextDouble() > 0.5) {
println("Throwing random exception..")
throw new RuntimeException("Random exception from normalTask")
}
atomicnt.getAndIncrement
Thread.sleep(2000 + Random.nextInt(5000))
println("Finished normaltask")
}
def testException() = {
val rg = (0 until 500)
val futures = rg.map(i =>{
Future(normalTask)
})
val res = failFast(futures)
Await.result(res, Duration.Inf) //blocking here to wait for all 500 to finish
}
def batchProcessing() {
try {
println("Starting batchProcessing")
testException()
println("Exiting batchProcessing")
} catch {
case t: Throwable => {
println("Error in main")
Thread.sleep(10000) //Here while waiting other futures are still running
t.printStackTrace()
// retry logic goes here based on failure or entire batch will be skipped
}
}
}然而,当我在batchProcessing中捕捉到异常时,其他的期货仍然在运行。
我尝试并行处理的另一个选择是使用并行收集,这似乎是可行的。即。如果任何一个任务失败,整个并行操作都会失败。然而,与此相关的问题是吞吐量,它受我没有可用的cpus的限制。因为所有的任务都是长时间运行的,并且阻塞,所以并行收集在那里是不合适的。
发布于 2020-09-03 01:16:19
内置的scala Future一旦启动就不能被中断。
看起来你需要一些像monix Task或ZIO这样的东西,它们可以很容易地被中断和重试。
https://stackoverflow.com/questions/63710411
复制相似问题