首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >删除spark-structured-streaming写入的损坏拼接文件时会丢失数据吗?

删除spark-structured-streaming写入的损坏拼接文件时会丢失数据吗?
EN

Stack Overflow用户
提问于 2019-05-25 11:12:23
回答 1查看 636关注 0票数 0

我使用spark-structured-streaming作为消费者从kafka获取数据,按照指南参考https://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html

然后将数据保存到hdfs作为拼图文件。

这是我的问题:程序运行良好,但一些容器很少失败(但它确实发生了),导致了一些损坏的拼接文件。它将导致错误,如不是拼图文件(长度太小: 4)或[.parquet不是拼图文件。期望的幻数在尾部80,65,82,49,但在读取它们时发现56,52,53,51]。我必须将它们移动到其他目录,并确保hive中的查询正常工作。但我不确定是否会因为移动而导致数据丢失。

我知道spark-structured-streaming使用检查点进行恢复,但由于一些数据已经写为拼图,我不确定偏移量是否被标记为提交。

EN

回答 1

Stack Overflow用户

发布于 2019-05-29 18:07:12

我做了一个非常基本的练习,将txt文件加载到由Spark structured streaming读取的文件目录中。结构化流的写入流正在写入拼图文件。加载两个文件后,我看到spark生成的元数据提到了这两个文件。因此,如果删除其中一个文件(包括使用文件接收器创建的元数据文件),从HDFS读取拼图文件将失败,并出现异常(File not found)。

代码语言:javascript
复制
scala> val ParquetDF1 = spark.read.parquet("/user/root/sink2")
19/05/29 09:57:27 WARN scheduler.TaskSetManager: Lost task 0.0 in stage 13.0 (TID 19, quickstart.cloudera, executor 2): org.apache.spark.SparkException: Exception thrown in awaitResult:
        at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:226)
        at org.apache.spark.util.ThreadUtils$.parmap(ThreadUtils.scala:290)
        at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$.readParquetFootersInParallel(ParquetFileFormat.scala:537)
        at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anonfun$9.apply(ParquetFileFormat.scala:610)
        at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anonfun$9.apply(ParquetFileFormat.scala:602)

Caused by: org.apache.hadoop.ipc.RemoteException(java.io.FileNotFoundException): File does not exist: /user/root/sink2/part-00000-454836ef-f7bc-444e-9a6b-e81e640a196d-c000.snappy.parquet
        at org.apache.hadoop.hdfs.server.namenode.INodeFile.valueOf(INodeFile.java:66)
        at org.apache.hadoop.hdfs.server.namenode.INodeFile.valueOf(INodeFile.java:56)
        at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocationsInt(FSNamesystem.java:2092)
        at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocations(FSNamesystem.java:2062)
        at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocations(FSNamesystem.java:1975)

这里唯一的区别是-您使用的是Hive,而我直接从HDFS构建的是拼接数据帧。

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/56301397

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档