我有一个用scala写的使用futures的spark应用程序。所以直到昨天,我还没有在未来中进行任何try/catch,但我需要添加。在try/catch之前,将来的任何错误都会停止spark应用程序,这是我想要的行为,但在try/catch之后,应用程序完成了,没有任何错误。
这是我的主要对象:
object TableProcessorWrapper extends SparkSessionWrapper {
def main(args: Array[String]): List[Unit] = {
implicit val ec = ExecutionContext.fromExecutor(new java.util.concurrent.ForkJoinPool(4))
val dynamodb = DynamodbOperations()
val tables = dynamodb.getTablesToProcess(args(0), "tabela")
val processors = for {
table <- tables
} yield Future {
TableProcessor(table).start(spark)
}
Await.result(Future.sequence(processors), Duration.Inf)
}
}这是未来代码的一部分:
class TableProcessor(
fileReader: FileReader,
tableProcessorConfig: TableProcessorConfig,
transformers: Seq[BaseTransformer],
anonymization: AnonymizationBase,
schemaConverter: SchemaConverter,
logAnalytics: LogAnalytics
) {
def start(spark: SparkSession): Unit = {
try {
CODE HERE
} catch {
case e: Exception =>
logAnalyticsWithFiles.stopProcessing().failedProcessing().writeLog()
throw new Exception(e)
}我做错了什么?
发布于 2021-01-07 00:23:20
首先,让我们将Spark从这个问题中分离出来,因为一旦你涉及到spark,你的CODE HERE部分在这个问题中就变得非常重要。使用Spark,你的代码应该在Spark的executor节点完成大部分工作,所以在那里抛出的任何异常可能不会出现在你的驱动程序代码中。
所以,让我们暂时忘记Spark,让我们专注于handle Future exceptions部件。
假设您正在调用一个doSomethingWithTable(table: YourTableType): Unit方法。
val processors =
for {
table <- tables
} yield Future {
doSomethingWithTable(table)
}
// or
val processors = tables.map(table => Future(doSomethingWithTable(table))现在,您的processors应该是一个Seq[Future[Unit]]。这些期货中的每一个都可以独立于其他期货而失败/成功。现在,您可以通过单独提供错误处理程序来简单地处理这些故障,
futures.foreach(_.recover(throwable => logException(throwable)))
def logException(throwable: Throwable): Unit = {}如果您只关心“停止Spark应用程序”,那么添加或删除try-catch应该不会对此产生任何影响。所以除非你的JVM进程退出,否则这些进程将继续运行直到结束。因为你在捕获它之后重新抛出异常,你的Future进程终止应该像没有try-catch时一样。
https://stackoverflow.com/questions/65598825
复制相似问题