有一个CSV文件的数据湖,这些文件全天都在更新。我正在尝试使用Trigger.Once功能outlined in this blog post创建一个Spark Structured作业,以定期将新数据写入到Parquet数据湖中的CSV数据湖中。
这就是我所拥有的:
val df = spark
.readStream
.schema(s)
.csv("s3a://csv-data-lake-files")下面的命令将所有数据写入到拼图湖中,但在写入所有数据后并未停止(我必须手动取消该作业)。
processedDf
.writeStream
.trigger(Trigger.Once)
.format("parquet")
.option("checkpointLocation", "s3-path-to-checkpoint")
.start("s3-path-to-parquet-lake")以下作业也有效,但在写入所有数据后也没有停止(我必须手动取消作业):
val query = processedDf
.writeStream
.trigger(Trigger.Once)
.format("parquet")
.option("checkpointLocation", "s3-path-to-checkpoint")
.start("s3-path-to-parquet-lake")
query.awaitTermination()以下命令在写入任何数据之前停止查询。
val query = processedDf
.writeStream
.trigger(Trigger.Once)
.format("parquet")
.option("checkpointLocation", "s3-path-to-checkpoint")
.start("s3-path-to-parquet-lake")
query.stop()如何将writeStream查询配置为等到所有增量数据都写入拼图文件后再停止?
发布于 2018-07-01 01:35:39
我让Structured Streaming + Trigger.Once在拼图数据湖上正常工作。
我不认为它与CSV数据湖一起工作,因为CSV数据湖在嵌套目录中有大量的小文件。Spark不喜欢使用小的CSV文件(我认为它需要打开所有文件才能读取头文件),并且真的讨厌当它需要glob S3目录时。
所以我认为Spark Structured + Trigger.Once代码是很好的-他们只需要让CSV阅读器技术更好。
发布于 2018-02-20 11:31:27
结构化流的主要目的是连续处理数据,而不需要在新数据到达时启动/停止流。有关更多详细信息,请阅读this。
从Spark 2.0.0开始,StreamingQuery具有方法processAllAvailable,该方法等待所有源数据被处理并提交到接收器。请注意,scala文档中包含states to use this method for testing purpose only。
因此,代码应该是这样的(如果您仍然需要它):
query.processAllAvailable
query.stophttps://stackoverflow.com/questions/45705212
复制相似问题