可以从查询执行期间抛出的异常自动恢复吗?
上下文:我正在开发一个应用程序,该应用程序可以读取卡夫卡主题中的数据,处理数据并输出到S3。但是,在生产运行几天之后,spark应用程序将面临来自S3的一些网络问题,这会导致抛出一个异常并停止应用程序。还值得一提的是,该应用程序使用GCP的星火k8s算子在Kubernetes上运行。
根据我到目前为止所看到的,这些异常很小,应用程序的简单重新启动就解决了这个问题。我们能处理这些异常并自动重新启动结构化流查询吗?
这里有一个抛出异常的例子:
Exception in thread "main" org.apache.spark.sql.streaming.StreamingQueryException: Job aborted.
=== Streaming Query ===
Identifier: ...
Current Committed Offsets: ...
Current Available Offsets: ...
Current State: ACTIVE
Thread State: RUNNABLE
Logical Plan: ...
at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:297)
at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:193)
Caused by: org.apache.spark.SparkException: Job aborted.
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:198)
at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:159)
at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:104)
at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:102)
at org.apache.spark.sql.execution.command.DataWritingCommandExec.doExecute(commands.scala:122)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:80)
at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:80)
at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:676)
at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:676)
at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:78)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:676)
at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:285)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:271)
at io.blahblahView$$anonfun$11$$anonfun$apply$2.apply(View.scala:90)
at io.blahblahView $$anonfun$11$$anonfun$apply$2.apply(View.scala:82)
at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:733)
at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:732)
at io.blahblahView$$anonfun$11.apply(View.scala:82)
at io.blahblahView$$anonfun$11.apply(View.scala:79)
at org.apache.spark.sql.execution.streaming.sources.ForeachBatchSink.addBatch(ForeachBatchSink.scala:35)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$5$$anonfun$apply$17.apply(MicroBatchExecution.scala:537)
at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:78)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$5.apply(MicroBatchExecution.scala:535)
at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:351)
at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch(MicroBatchExecution.scala:534)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply$mcV$sp(MicroBatchExecution.scala:198)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:166)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:166)
at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:351)
at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1.apply$mcZ$sp(MicroBatchExecution.scala:166)
at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:56)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:160)
at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:281)
... 1 more
Caused by: java.io.FileNotFoundException: No such file or directory: s3a://.../view/v1/_temporary/0
at org.apache.hadoop.fs.s3a.S3AFileSystem.getFileStatus(S3AFileSystem.java:993)
at org.apache.hadoop.fs.s3a.S3AFileSystem.listStatus(S3AFileSystem.java:734)
at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:1517)
at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:1557)
at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.getAllCommittedTaskPaths(FileOutputCommitter.java:291)
at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.commitJobInternal(FileOutputCommitter.java:361)
at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.commitJob(FileOutputCommitter.java:334)
at org.apache.parquet.hadoop.ParquetOutputCommitter.commitJob(ParquetOutputCommitter.java:48)
at org.apache.spark.internal.io.HadoopMapReduceCommitProtocol.commitJob(HadoopMapReduceCommitProtocol.scala:166)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:187)
... 47 more,自动处理这些问题的最简单的方法是什么?
发布于 2020-05-08 19:53:05
在花费了太多的时间试图找到一个优雅的解决这个问题,而没有找到任何东西,下面是我的想法。
有人可能会说这是一个黑客,但它很简单,它的工作和解决一个复杂的问题。我在生产中测试了它,它解决了由于偶尔出现的小异常而自动从故障中恢复的问题。
我称它为,查询看门狗。下面是最简单的版本,监督狗将无限期地重试运行查询:
val writer = df.writeStream...
while (true) {
val query = writer.start()
try {
query.awaitTermination()
}
catch {
case e: StreamingQueryException => println("Streaming Query Exception caught!: " + e);
}
}有些人可能想用某种计数器替换while(true),以限制重试的次数。当重试发生时,也可以有人补充此代码,并通过松弛或电子邮件发送通知。其他人只需收集普罗米修斯的重试次数。
希望能帮上忙
干杯
发布于 2022-11-02 17:57:35
既然您使用的是星火运算符,为什么不使用它的重新启动功能呢?如果控制器注意到应用程序已经停止,那么它将自动重新提交它。
这将在应用程序失败的情况下工作,即驱动程序停止。在某些情况下,会抛出驱动程序异常,但驱动程序仍然没有执行任何操作。在这种情况下,Spark操作符会认为应用程序仍然运行正常。
发布于 2020-05-08 14:55:07
不,没有可靠的方法可以做到这一点。顺便说一句,不也是一个答案。
- That said, it is not clear for the foreachXXX constructs that the micro batch will be recoverable in such an approach afaics, some part of the microbatch is highly likely lost. Hard to test though.
所以,好的问题,因为我想知道(在过去)。
https://stackoverflow.com/questions/61666010
复制相似问题