首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >从查询异常恢复结构化流

从查询异常恢复结构化流
EN

Stack Overflow用户
提问于 2020-05-07 19:19:13
回答 4查看 3.6K关注 0票数 5

可以从查询执行期间抛出的异常自动恢复吗?

上下文:我正在开发一个应用程序,该应用程序可以读取卡夫卡主题中的数据,处理数据并输出到S3。但是,在生产运行几天之后,spark应用程序将面临来自S3的一些网络问题,这会导致抛出一个异常并停止应用程序。还值得一提的是,该应用程序使用GCP的星火k8s算子在Kubernetes上运行。

根据我到目前为止所看到的,这些异常很小,应用程序的简单重新启动就解决了这个问题。我们能处理这些异常并自动重新启动结构化流查询吗?

这里有一个抛出异常的例子:

代码语言:javascript
复制
    Exception in thread "main" org.apache.spark.sql.streaming.StreamingQueryException: Job aborted.
    === Streaming Query ===
    Identifier: ...
    Current Committed Offsets: ...
    Current Available Offsets: ...

    Current State: ACTIVE
    Thread State: RUNNABLE

    Logical Plan: ...

        at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:297)
        at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:193)
    Caused by: org.apache.spark.SparkException: Job aborted.
        at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:198)
        at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:159)
        at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:104)
        at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:102)
        at org.apache.spark.sql.execution.command.DataWritingCommandExec.doExecute(commands.scala:122)
        at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
        at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
        at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
        at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
        at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
        at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:80)
        at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:80)
        at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:676)
        at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:676)
        at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:78)
        at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
        at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
        at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:676)
        at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:285)
        at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:271)
        at io.blahblahView$$anonfun$11$$anonfun$apply$2.apply(View.scala:90)
        at io.blahblahView $$anonfun$11$$anonfun$apply$2.apply(View.scala:82)
        at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:733)
        at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
        at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
        at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:732)
        at io.blahblahView$$anonfun$11.apply(View.scala:82)
        at io.blahblahView$$anonfun$11.apply(View.scala:79)
        at org.apache.spark.sql.execution.streaming.sources.ForeachBatchSink.addBatch(ForeachBatchSink.scala:35)
        at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$5$$anonfun$apply$17.apply(MicroBatchExecution.scala:537)
        at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:78)
        at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
        at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
        at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$5.apply(MicroBatchExecution.scala:535)
        at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:351)
        at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
        at org.apache.spark.sql.execution.streaming.MicroBatchExecution.org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch(MicroBatchExecution.scala:534)
        at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply$mcV$sp(MicroBatchExecution.scala:198)
        at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:166)
        at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:166)
        at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:351)
        at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
        at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1.apply$mcZ$sp(MicroBatchExecution.scala:166)
        at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:56)
        at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:160)
        at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:281)
        ... 1 more
    Caused by: java.io.FileNotFoundException: No such file or directory: s3a://.../view/v1/_temporary/0
        at org.apache.hadoop.fs.s3a.S3AFileSystem.getFileStatus(S3AFileSystem.java:993)
        at org.apache.hadoop.fs.s3a.S3AFileSystem.listStatus(S3AFileSystem.java:734)
        at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:1517)
        at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:1557)
        at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.getAllCommittedTaskPaths(FileOutputCommitter.java:291)
        at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.commitJobInternal(FileOutputCommitter.java:361)
        at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.commitJob(FileOutputCommitter.java:334)
        at org.apache.parquet.hadoop.ParquetOutputCommitter.commitJob(ParquetOutputCommitter.java:48)
        at org.apache.spark.internal.io.HadoopMapReduceCommitProtocol.commitJob(HadoopMapReduceCommitProtocol.scala:166)
        at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:187)
        ... 47 more

,自动处理这些问题的最简单的方法是什么?

EN

回答 4

Stack Overflow用户

回答已采纳

发布于 2020-05-08 19:53:05

在花费了太多的时间试图找到一个优雅的解决这个问题,而没有找到任何东西,下面是我的想法。

有人可能会说这是一个黑客,但它很简单,它的工作和解决一个复杂的问题。我在生产中测试了它,它解决了由于偶尔出现的小异常而自动从故障中恢复的问题。

我称它为,查询看门狗。下面是最简单的版本,监督狗将无限期地重试运行查询:

代码语言:javascript
复制
val writer = df.writeStream...

while (true) {
   val query = writer.start()

   try {
        query.awaitTermination()
   } 
   catch {
       case e: StreamingQueryException => println("Streaming Query Exception caught!: " + e);
   }
}

有些人可能想用某种计数器替换while(true),以限制重试的次数。当重试发生时,也可以有人补充此代码,并通过松弛或电子邮件发送通知。其他人只需收集普罗米修斯的重试次数。

希望能帮上忙

干杯

票数 8
EN

Stack Overflow用户

发布于 2022-11-02 17:57:35

既然您使用的是星火运算符,为什么不使用它的重新启动功能呢?如果控制器注意到应用程序已经停止,那么它将自动重新提交它。

这将在应用程序失败的情况下工作,即驱动程序停止。在某些情况下,会抛出驱动程序异常,但驱动程序仍然没有执行任何操作。在这种情况下,Spark操作符会认为应用程序仍然运行正常。

票数 1
EN

Stack Overflow用户

发布于 2020-05-08 14:55:07

不,没有可靠的方法可以做到这一点。顺便说一句,不也是一个答案。

  • 检查异常的逻辑通常通过在驱动程序上运行try / catch来实现。
  • 由于Spark本身已经为结构化流标准地处理了执行器级别上的意外情况,如果错误不可恢复,则App / Job在错误信号发送回驱动程序后就会崩溃,除非您在各种foreachXXX结构中编码try / catch。
代码语言:javascript
复制
- That said, it is not clear for the foreachXXX constructs that the micro batch will be recoverable in such an approach afaics, some part of the microbatch is highly likely lost. Hard to test though.

  • 考虑到Spark提供了一些您无法连接的标准服务,为什么可以在程序的源代码中插入一个循环或尝试/捕获呢?同样地,广播变量也是一个问题--尽管有些人对此有技术,所以他们说。但这并不符合该框架的精神。

所以,好的问题,因为我想知道(在过去)。

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/61666010

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档