首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >在Trigger.Once中使用Spark Structured

在Trigger.Once中使用Spark Structured
EN

Stack Overflow用户
提问于 2017-08-16 12:38:38
回答 2查看 7.3K关注 0票数 12

有一个CSV文件的数据湖,这些文件全天都在更新。我正在尝试使用Trigger.Once功能outlined in this blog post创建一个Spark Structured作业,以定期将新数据写入到Parquet数据湖中的CSV数据湖中。

这就是我所拥有的:

代码语言:javascript
复制
val df = spark
  .readStream
  .schema(s)
  .csv("s3a://csv-data-lake-files")

下面的命令将所有数据写入到拼图湖中,但在写入所有数据后并未停止(我必须手动取消该作业)。

代码语言:javascript
复制
processedDf
  .writeStream
  .trigger(Trigger.Once)
  .format("parquet")
  .option("checkpointLocation", "s3-path-to-checkpoint")
  .start("s3-path-to-parquet-lake")

以下作业也有效,但在写入所有数据后也没有停止(我必须手动取消作业):

代码语言:javascript
复制
val query = processedDf
  .writeStream
  .trigger(Trigger.Once)
  .format("parquet")
  .option("checkpointLocation", "s3-path-to-checkpoint")
  .start("s3-path-to-parquet-lake")

query.awaitTermination()

以下命令在写入任何数据之前停止查询。

代码语言:javascript
复制
val query = processedDf
  .writeStream
  .trigger(Trigger.Once)
  .format("parquet")
  .option("checkpointLocation", "s3-path-to-checkpoint")
  .start("s3-path-to-parquet-lake")

query.stop()

如何将writeStream查询配置为等到所有增量数据都写入拼图文件后再停止?

EN

回答 2

Stack Overflow用户

发布于 2018-07-01 01:35:39

我让Structured Streaming + Trigger.Once在拼图数据湖上正常工作。

我不认为它与CSV数据湖一起工作,因为CSV数据湖在嵌套目录中有大量的小文件。Spark不喜欢使用小的CSV文件(我认为它需要打开所有文件才能读取头文件),并且真的讨厌当它需要glob S3目录时。

所以我认为Spark Structured + Trigger.Once代码是很好的-他们只需要让CSV阅读器技术更好。

票数 3
EN

Stack Overflow用户

发布于 2018-02-20 11:31:27

结构化流的主要目的是连续处理数据,而不需要在新数据到达时启动/停止流。有关更多详细信息,请阅读this

从Spark 2.0.0开始,StreamingQuery具有方法processAllAvailable,该方法等待所有源数据被处理并提交到接收器。请注意,scala文档中包含states to use this method for testing purpose only

因此,代码应该是这样的(如果您仍然需要它):

代码语言:javascript
复制
query.processAllAvailable
query.stop
票数 2
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/45705212

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档