首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >期货列表-当异常发生时将其全部删除

期货列表-当异常发生时将其全部删除
EN

Stack Overflow用户
提问于 2020-09-03 01:09:25
回答 1查看 283关注 0票数 0

我正在使用scala futures异步提交上千个作业。我还实现了一个由并发阻塞队列支持的ThrottledExecutionContext,因此它一次最多只运行100个作业,并将其余作业放入队列中。这是一个阻塞操作,因为它涉及调用自身的第三方服务。当其中一个抛出异常时,我需要重试整个操作(数千个作业)或跳过整个批处理。当一些期货仍在运行时,我不能重试。我有一种方法可以知道在任何给定的时间点有多少作业在第三方系统(spark)中运行。因此,一旦我捕捉到异常,我想首先杀死所有剩余的期货,清空队列,等待第三方完成该批处理的所有未决作业,然后重试。那么有没有办法在一个异常的情况下杀死所有的期货呢?

根据下面的讨论,我尝试了failFast,但它并没有做我期望的事情。我对Promise还没有更好的理解。但是似乎我们可以用Promise来控制Future的未来!

Scala Future/Promise fast-fail pipeline

代码语言:javascript
复制
  var atomicnt = new AtomicInteger() // to track how many jobs were finished when exception occured

  def failFast[T](futures: Seq[Future[T]]): Future[Seq[T]] = {
    val promise = Promise[Seq[T]]
    futures.foreach{f => f.onFailure{case ex => promise.failure(ex)}}
    val res = Future.sequence(futures)
    promise.completeWith(res).future
  }

  def normalTask() =  {
    println("Starting normaltask")
    Thread.sleep(2000 + Random.nextInt(5000))
    if(Random.nextDouble() > 0.5) {
      println("Throwing random exception..")
      throw new RuntimeException("Random exception from normalTask")
    }
    atomicnt.getAndIncrement
    Thread.sleep(2000 + Random.nextInt(5000))
    println("Finished normaltask")
  }

  def testException() = {
    val rg = (0 until 500)
    val futures = rg.map(i =>{
      Future(normalTask)
    })
    val res = failFast(futures)
    Await.result(res, Duration.Inf) //blocking here to wait for all 500 to finish    
  }

  def batchProcessing() {
    
    try {
      println("Starting batchProcessing")
      testException()
      println("Exiting batchProcessing")      
    } catch {
      case t: Throwable => {
        println("Error in main")
        Thread.sleep(10000) //Here while waiting other futures are still running
        t.printStackTrace()
        // retry logic goes here based on failure or entire batch will be skipped 
      }
    }
    
  }

然而,当我在batchProcessing中捕捉到异常时,其他的期货仍然在运行。

我尝试并行处理的另一个选择是使用并行收集,这似乎是可行的。即。如果任何一个任务失败,整个并行操作都会失败。然而,与此相关的问题是吞吐量,它受我没有可用的cpus的限制。因为所有的任务都是长时间运行的,并且阻塞,所以并行收集在那里是不合适的。

EN

回答 1

Stack Overflow用户

发布于 2020-09-03 01:16:19

内置的scala Future一旦启动就不能被中断。

看起来你需要一些像monix Task或ZIO这样的东西,它们可以很容易地被中断和重试。

票数 3
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/63710411

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档