我为SparkContext添加了检查点,并为长期运行的火花结构化流作业编写了对kafka数据流的查询。
spark.sparkContext.setCheckpointDir("/tmp/checkpoint")
...
val monitoring_stream = monitoring_df.writeStream
.trigger(Trigger.ProcessingTime("120 seconds"))
.option("checkpointLocation", "s3a://spark-checkpoint/checkpointfiles")
.foreachBatch { (batchDF: DataFrame, batchId: Long) =>
if(!batchDF.isEmpty)
{
}
.start()
.awaitTermination()火花作业运行稳定。但是,我注意到检查点文件是在HDFS和S3中积累的,没有自动清理。我看到这些文件不断地占用存储空间。是否有办法为这些检查点文件配置保留时间以使其自动删除?还是需要运行某个cron作业来手动删除它们?如果我手动删除它们,会不会影响正在进行的火花作业?谢谢!
发布于 2020-09-27 17:21:07
spark.cleaner.referenceTracking.cleanCheckpoints需要设置为true,默认值为false。
https://stackoverflow.com/questions/64087582
复制相似问题