首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >Akka-http异常处理(Scala)

Akka-http异常处理(Scala)
EN

Stack Overflow用户
提问于 2019-10-10 18:43:58
回答 1查看 662关注 0票数 0

我正在使用Akka-hhtp (scala)向API异步发送多个http批处理请求,并且想知道当响应代码不是200OK时,处理异常的正确方法是什么。

下面是一些伪代码来演示我的观点。

代码语言:javascript
复制
/* Using For comprehension here because the server API has restriction on the amount of data we can send and the time it takes them to process each request. So they require us to send multiple mini requests instead. If one of those fails, then our entire job should fail.*/

val eventuallyResponses = for {
    batches <- postBatch(payload)
} yield batches

val eventualResponses = Future.sequence(eventuallyResponses)

/* Do I need to recover here? If I don't, will the actor system terminate? */
eventualResponses.recover { case es =>
   log.warn("some message")
   List()
}

/* As I said I need to wait for all mini batch requests to complete. If one response is different than 200, then the entire job should fail. */
val result = Await.result(eventualResponses, 10.minutes)


actorSystem.terminate().oncomplete{
  case Success(_) =>
      if (result.isEmpty) =>
          /* This doesn't seem to interrupt the program */
          throw new RuntimeException("POST failed")
      } else {
          log.info("POST Successful")
      }
   case Failure(ex) =>
      log.error("error message $ex")
      throw ex
}

def postBatch(payload) = {
    val responseFuture: Future[HttpResponse] = httpClient.post(payload)

     responseFuture.flatMap{ res =>
       res.status match {
         case StatusCodes.OK => Future.successful(res)
         case _ => Future.failed(new RuntimeException("error message"))
       }
      }
}

当我们收到不同于OK的StatusCodes时,上面的代码抛出异常。它确实通过了result.isEmpty true的分支,但是它似乎不会停止/中断程序的执行。我需要它这样做,因为这是作为Autosys作业调度的,并且如果至少有一个批处理请求返回的响应不同于200OK,我需要使作业失败。

如果我不执行recover并抛出异常(当我们收到非200状态码时),执行元系统会被正确终止吗?

你知道做上述事情的好方法吗?

谢谢:)

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2019-10-10 22:47:29

据我所知,如果一些响应没有状态200,你需要从main body抛出一个异常。

代码语言:javascript
复制
def postBatch(payload: HttpRequest)(implicit system: ActorSystem, ec: ExecutionContext): Future[HttpResponse] = {
    Http().singleRequest(payload).flatMap(response => response.status match {
        case StatusCodes.OK => Future.successful(response)
        case _ => Future.failed(new RuntimeException("error message"))
    })
}

val reuests: List[HttpRequest] = List(...)
/*
You don't need for comprehension here because
val eventuallyResponses = for {
  batches <- postBatch(payload)
} yield batches

is equal to
val eventuallyResponses = postBatch(payload)

For comprehension doesn't process recursive sending. If you need it you should write it yourself by flatMap on each request future.
*/
val eventualResponses: Future[List[HttpResponse]] =
    Future.sequence(reuests.map(postBatch)) //also, its better to add some throttling logic here

//as far as i understand you need to wait for all responses and stop the actor system after that
Await.ready(eventualResponses, 10 minutes) //wait for all responses
Await.ready(actorSystem.terminate(), Duration.Inf) //wait for actor system termination

//because of Await.ready(eventualResponses, 10 minutes) we can match on the future value and expect that it should be completed  
eventualResponses.value match {
    case Some(Success(responses)) =>
        log.info("All requests completed")
    case Some(Failure(exception)) =>
        log.error("Some request failed")
        throw exception //rethrow this exception
    case None =>
        log.error("Requests timed out")
        throw RuntimeException("Requests timed out")
}
票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/58321098

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档