我有一个长期运行的结构化流媒体作业,它消耗了几个Kafka主题,并在滑动窗口上聚合。我需要了解如何在HDFS中管理/清理检查点。
作业运行良好,我能够从失败的步骤中恢复,而不会丢失数据,但是,我可以看到HDFS利用率每天都在增加。我找不到任何关于Spark如何管理/清理检查点的文档。以前,检查点存储在s3上,但由于要读/写大量的小文件,这被证明是非常昂贵的。
query = formatted_stream.writeStream \
.format("kafka") \
.outputMode(output_mode) \
.option("kafka.bootstrap.servers", bootstrap_servers) \
.option("checkpointLocation", "hdfs:///path_to_checkpoints") \
.start()据我所知,检查点应该是自动清理的;几天后,我看到我的HDFS使用率呈线性增长。如何确保检查点得到管理,并且HDFS不会耗尽空间?
Spark Structured Streaming Checkpoint Cleanup的公认答案是,结构化流应该处理这个问题,但不是如何或如何配置它。
发布于 2019-01-07 19:26:45
正如您在the code for Checkpoint.scala中看到的,检查点机制保留了最后10个检查点数据,但这在几天内应该不会成为问题。
这样做的一个常见原因是,您在磁盘上持久化的RDDs也随着时间线性增长。这可能是由于一些您不关心持久化的RDDs造成的。
您需要确保在使用结构化流式传输时,不存在需要持久化的RDDs。例如,如果要计算数据集的列上不同元素的精确计数,则需要知道完整的输入数据(这意味着如果每个批处理的数据不断涌入,则持久化随时间线性增加的数据)。相反,如果您可以使用近似计数,则可以使用HyperLogLog++之类的算法,这种算法通常需要更少的内存来权衡精度。
请记住,如果您正在使用Spark SQL,您可能希望进一步检查优化后的查询结果,因为这可能与Catalyst如何优化您的查询有关。如果你没有,那么如果你这样做了,Catalyst可能已经为你优化了你的查询。
在任何情况下,进一步的思考:如果检查点使用率随着时间的增加而增加,这应该反映在您的流作业也随着时间线性地消耗更多的RAM,因为检查点只是Spark上下文的序列化(加上恒定大小的元数据)。如果是这种情况,请查看SO中的相关问题,例如why does memory usage of Spark Worker increase with time?。
此外,要有意义地说明您在哪个RDDs上调用.persist() (以及哪个缓存级别,以便您可以将元数据加载到磁盘RDDs中,并且一次只将它们部分加载到Spark上下文中)。
https://stackoverflow.com/questions/54070524
复制相似问题