首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >捕获期货中的异常

捕获期货中的异常
EN

Stack Overflow用户
提问于 2021-01-06 23:42:28
回答 1查看 93关注 0票数 0

我有一个用scala写的使用futures的spark应用程序。所以直到昨天,我还没有在未来中进行任何try/catch,但我需要添加。在try/catch之前,将来的任何错误都会停止spark应用程序,这是我想要的行为,但在try/catch之后,应用程序完成了,没有任何错误。

这是我的主要对象:

代码语言:javascript
复制
object TableProcessorWrapper extends SparkSessionWrapper {

  def main(args: Array[String]): List[Unit] = {

    implicit val ec = ExecutionContext.fromExecutor(new java.util.concurrent.ForkJoinPool(4))

    val dynamodb = DynamodbOperations()

    val tables = dynamodb.getTablesToProcess(args(0), "tabela")

    val processors = for {
      table <- tables
    } yield Future {
      TableProcessor(table).start(spark)
    }

    Await.result(Future.sequence(processors), Duration.Inf)
  }
}

这是未来代码的一部分:

代码语言:javascript
复制
class TableProcessor(
    fileReader: FileReader,
    tableProcessorConfig: TableProcessorConfig,
    transformers: Seq[BaseTransformer],
    anonymization: AnonymizationBase,
    schemaConverter: SchemaConverter,
    logAnalytics: LogAnalytics
) {

  def start(spark: SparkSession): Unit = {


try {

CODE HERE

} catch {
        case e: Exception =>
          logAnalyticsWithFiles.stopProcessing().failedProcessing().writeLog()
          throw new Exception(e)
      }

我做错了什么?

EN

回答 1

Stack Overflow用户

发布于 2021-01-07 00:23:20

首先,让我们将Spark从这个问题中分离出来,因为一旦你涉及到spark,你的CODE HERE部分在这个问题中就变得非常重要。使用Spark,你的代码应该在Spark的executor节点完成大部分工作,所以在那里抛出的任何异常可能不会出现在你的驱动程序代码中。

所以,让我们暂时忘记Spark,让我们专注于handle Future exceptions部件。

假设您正在调用一个doSomethingWithTable(table: YourTableType): Unit方法。

代码语言:javascript
复制
val processors = 
  for {
    table <- tables
  } yield Future {
    doSomethingWithTable(table)
  }

// or

val processors = tables.map(table => Future(doSomethingWithTable(table))

现在,您的processors应该是一个Seq[Future[Unit]]。这些期货中的每一个都可以独立于其他期货而失败/成功。现在,您可以通过单独提供错误处理程序来简单地处理这些故障,

代码语言:javascript
复制
futures.foreach(_.recover(throwable => logException(throwable)))

def logException(throwable: Throwable): Unit = {}

如果您只关心“停止Spark应用程序”,那么添加或删除try-catch应该不会对此产生任何影响。所以除非你的JVM进程退出,否则这些进程将继续运行直到结束。因为你在捕获它之后重新抛出异常,你的Future进程终止应该像没有try-catch时一样。

票数 2
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/65598825

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档