首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >火花结构流失败双到检查点文件找不到

火花结构流失败双到检查点文件找不到
EN

Stack Overflow用户
提问于 2018-02-02 05:54:48
回答 1查看 4K关注 0票数 8

我正在测试env上运行火花结构化流。有时,找不到某个检查点文件的作业失败。

其中一个原因可能是卡夫卡的主题保留时间很短。但我已经将.option("failOnDataLoss", "false")添加到SparkSession中了。

我对火花检查点有一些基本的(非常基本的)理解。如果删除检查点dir,我认为应该恢复它。但是,正如我测试的那样,一旦发生了这个错误,删除dir就没有帮助了。我需要使用不同的检查点dir来修复它。

为什么删除检查点dir不起作用?或者是否有一种方法/选项可以帮助避免此错误?

代码语言:javascript
复制
     diagnostics: User class threw exception: org.apache.spark.sql.streaming.StreamingQueryException: Job aborted due to stage failure: Task 4 in stage 1.0 failed 4 times, most recent failure: Lost task 4.3 in stage 1.0 (TID 14, master-testspark.runspark.com, executor 2): java.lang.IllegalStateException: Error reading delta file consumer-cp3/state/0/4/1.delta of HDFSStateStoreProvider[id = (op=0, part=4), dir = consumer-cp3/state/0/4]: consumer-cp3/state/0/4/1.delta does not exist
    at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$updateFromDeltaFile(HDFSBackedStateStoreProvider.scala:410)
    at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$$anonfun$org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$loadMap$1$$anonfun$6.apply(HDFSBackedStateStoreProvider.scala:362)
    at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$$anonfun$org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$loadMap$1$$anonfun$6.apply(HDFSBackedStateStoreProvider.scala:359)
    at scala.Option.getOrElse(Option.scala:121)
    at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$$anonfun$org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$loadMap$1.apply(HDFSBackedStateStoreProvider.scala:359)
    at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$$anonfun$org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$loadMap$1.apply(HDFSBackedStateStoreProvider.scala:358)
    at scala.Option.getOrElse(Option.scala:121)
    at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$loadMap(HDFSBackedStateStoreProvider.scala:358)
    at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.getStore(HDFSBackedStateStoreProvider.scala:265)
    at org.apache.spark.sql.execution.streaming.state.StateStore$.get(StateStore.scala:200)
    at org.apache.spark.sql.execution.streaming.state.StateStoreRDD.compute(StateStoreRDD.scala:61)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
    at org.apache.spark.scheduler.Task.run(Task.scala:108)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:338)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.FileNotFoundException: File does not exist: /user/spark/consumer-cp3/state/0/4/1.delta
    at org.apache.hadoop.hdfs.server.namenode.INodeFile.valueOf(INodeFile.java:71)
    at org.apache.hadoop.hdfs.server.namenode.INodeFile.valueOf(INodeFile.java:61)
    at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocationsInt(FSNamesystem.java:2025)
    at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocations(FSNamesystem.java:1996)
    at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocations(FSNamesystem.java:1909)
    at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.getBlockLocations(NameNodeRpcServer.java:700)
    at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.getBlockLocations(ClientNamenodeProtocolServerSideTranslatorPB.java:377)
    at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
    at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:640)
    at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:982)
    at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2351)
    at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2347)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:422)
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1866)
    at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2347)

    at sun.reflect.GeneratedConstructorAccessor10.newInstance(Unknown Source)
    at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
    at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
    at org.apache.hadoop.ipc.RemoteException.instantiateException(RemoteException.java:106)
    at org.apache.hadoop.ipc.RemoteException.unwrapRemoteException(RemoteException.java:73)
    at org.apache.hadoop.hdfs.DFSClient.callGetBlockLocations(DFSClient.java:1240)
    at org.apache.hadoop.hdfs.DFSClient.getLocatedBlocks(DFSClient.java:1225)
    at org.apache.hadoop.hdfs.DFSClient.getLocatedBlocks(DFSClient.java:1213)
    at org.apache.hadoop.hdfs.DFSInputStream.fetchLocatedBlocksAndGetLastBlockLength(DFSInputStream.java:309)
    at org.apache.hadoop.hdfs.DFSInputStream.openInfo(DFSInputStream.java:274)
    at org.apache.hadoop.hdfs.DFSInputStream.<init>(DFSInputStream.java:266)
    at org.apache.hadoop.hdfs.DFSClient.open(DFSClient.java:1538)
    at org.apache.hadoop.hdfs.DistributedFileSystem$4.doCall(DistributedFileSystem.java:332)
    at org.apache.hadoop.hdfs.DistributedFileSystem$4.doCall(DistributedFileSystem.java:327)
    at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
    at org.apache.hadoop.hdfs.DistributedFileSystem.open(DistributedFileSystem.java:340)
    at org.apache.hadoop.fs.FileSystem.open(FileSystem.java:786)
    at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$updateFromDeltaFile(HDFSBackedStateStoreProvider.scala:407)
    ... 27 more
Caused by: org.apache.hadoop.ipc.RemoteException(java.io.FileNotFoundException): File does not exist: /user/spark/consumer-cp3/state/0/4/1.delta
    at org.apache.hadoop.hdfs.server.namenode.INodeFile.valueOf(INodeFile.java:71)
    at org.apache.hadoop.hdfs.server.namenode.INodeFile.valueOf(INodeFile.java:61)
    at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocationsInt(FSNamesystem.java:2025)
    at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocations(FSNamesystem.java:1996)
    at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocations(FSNamesystem.java:1909)
    at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.getBlockLocations(NameNodeRpcServer.java:700)
    at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.getBlockLocations(ClientNamenodeProtocolServerSideTranslatorPB.java:377)
    at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
    at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:640)
    at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:982)
    at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2351)
    at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2347)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:422)
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1866)
    at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2347)

    at org.apache.hadoop.ipc.Client.getRpcResponse(Client.java:1554)
    at org.apache.hadoop.ipc.Client.call(Client.java:1498)
    at org.apache.hadoop.ipc.Client.call(Client.java:1398)
    at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:233)
    at com.sun.proxy.$Proxy15.getBlockLocations(Unknown Source)
    at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.getBlockLocations(ClientNamenodeProtocolTranslatorPB.java:272)
    at sun.reflect.GeneratedMethodAccessor11.invoke(Unknown Source)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:291)
    at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:203)
    at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:185)
    at com.sun.proxy.$Proxy16.getBlockLocations(Unknown Source)
    at org.apache.hadoop.hdfs.DFSClient.callGetBlockLocations(DFSClient.java:1238)
    ... 39 more
EN

回答 1

Stack Overflow用户

发布于 2018-02-02 06:18:22

Checkpointing是一个截断RDD lineage graph并将其保存到可靠的分布式(HDFS)或本地文件系统的过程。

来自、、、n两种类型的检查点。

  • 元数据检查点 将定义流计算的信息保存到fault-tolerant存储中,如HDFS。这用于从运行streaming application驱动程序的节点的故障中恢复。
  • 数据检查点 这在一些跨多个批次组合数据的stateful transformations中是必要的。在这样的转换中,生成的RDDs依赖于以前批的RDDs,这导致依赖链的长度随着时间的推移而增加。为了避免恢复时间的无界增长(与依赖链成正比),中间RDDs of stateful transformations被周期性地checkpointed到可靠存储(例如HDFS)以切断依赖链。

如果您想检查它的内部工作方式,请查看Tathagata 使用检查点的容错技术的这个火花博客

Spark使用Checkpoint从故障中恢复。不能重新创建已删除的目录。这就是为什么你要得到这个例外。

代码语言:javascript
复制
java.io.FileNotFoundException: File does not exist: /user/spark/consumer-cp3/state/0/4/1.delta
票数 -4
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/48576508

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档