首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >为什么在流数据集中使用缓存会导致"AnalysisException:必须用writeStream.start()执行流源查询“而失败?

为什么在流数据集中使用缓存会导致"AnalysisException:必须用writeStream.start()执行流源查询“而失败?
EN

Stack Overflow用户
提问于 2017-02-06 07:07:00
回答 1查看 6.7K关注 0票数 13
代码语言:javascript
复制
SparkSession
  .builder
  .master("local[*]")
  .config("spark.sql.warehouse.dir", "C:/tmp/spark")
  .config("spark.sql.streaming.checkpointLocation", "C:/tmp/spark/spark-checkpoint")
  .appName("my-test")
  .getOrCreate
  .readStream
  .schema(schema)
  .json("src/test/data")
  .cache
  .writeStream
  .start
  .awaitTermination

在Spark2.1.0中执行此示例时,我得到了错误。如果没有.cache选项,它将按预期工作,但对于.cache选项,我得到了:

代码语言:javascript
复制
Exception in thread "main" org.apache.spark.sql.AnalysisException: Queries with streaming sources must be executed with writeStream.start();;
FileSource[src/test/data]
at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.org$apache$spark$sql$catalyst$analysis$UnsupportedOperationChecker$$throwError(UnsupportedOperationChecker.scala:196)
at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$$anonfun$checkForBatch$1.apply(UnsupportedOperationChecker.scala:35)
at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$$anonfun$checkForBatch$1.apply(UnsupportedOperationChecker.scala:33)
at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:128)
at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.checkForBatch(UnsupportedOperationChecker.scala:33)
at org.apache.spark.sql.execution.QueryExecution.assertSupported(QueryExecution.scala:58)
at org.apache.spark.sql.execution.QueryExecution.withCachedData$lzycompute(QueryExecution.scala:69)
at org.apache.spark.sql.execution.QueryExecution.withCachedData(QueryExecution.scala:67)
at org.apache.spark.sql.execution.QueryExecution.optimizedPlan$lzycompute(QueryExecution.scala:73)
at org.apache.spark.sql.execution.QueryExecution.optimizedPlan(QueryExecution.scala:73)
at org.apache.spark.sql.execution.QueryExecution.sparkPlan$lzycompute(QueryExecution.scala:79)
at org.apache.spark.sql.execution.QueryExecution.sparkPlan(QueryExecution.scala:75)
at org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:84)
at org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:84)
at org.apache.spark.sql.execution.CacheManager$$anonfun$cacheQuery$1.apply(CacheManager.scala:102)
at org.apache.spark.sql.execution.CacheManager.writeLock(CacheManager.scala:65)
at org.apache.spark.sql.execution.CacheManager.cacheQuery(CacheManager.scala:89)
at org.apache.spark.sql.Dataset.persist(Dataset.scala:2479)
at org.apache.spark.sql.Dataset.cache(Dataset.scala:2489)
at org.me.App$.main(App.scala:23)
at org.me.App.main(App.scala)

有什么想法吗?

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2017-05-30 19:21:30

您的(非常有趣的)案例归结为以下一行(可以在spark-shell中执行):

代码语言:javascript
复制
scala> :type spark
org.apache.spark.sql.SparkSession

scala> spark.readStream.text("files").cache
org.apache.spark.sql.AnalysisException: Queries with streaming sources must be executed with writeStream.start();;
FileSource[files]
  at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.org$apache$spark$sql$catalyst$analysis$UnsupportedOperationChecker$$throwError(UnsupportedOperationChecker.scala:297)
  at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$$anonfun$checkForBatch$1.apply(UnsupportedOperationChecker.scala:36)
  at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$$anonfun$checkForBatch$1.apply(UnsupportedOperationChecker.scala:34)
  at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:127)
  at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.checkForBatch(UnsupportedOperationChecker.scala:34)
  at org.apache.spark.sql.execution.QueryExecution.assertSupported(QueryExecution.scala:63)
  at org.apache.spark.sql.execution.QueryExecution.withCachedData$lzycompute(QueryExecution.scala:74)
  at org.apache.spark.sql.execution.QueryExecution.withCachedData(QueryExecution.scala:72)
  at org.apache.spark.sql.execution.QueryExecution.optimizedPlan$lzycompute(QueryExecution.scala:78)
  at org.apache.spark.sql.execution.QueryExecution.optimizedPlan(QueryExecution.scala:78)
  at org.apache.spark.sql.execution.QueryExecution.sparkPlan$lzycompute(QueryExecution.scala:84)
  at org.apache.spark.sql.execution.QueryExecution.sparkPlan(QueryExecution.scala:80)
  at org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:89)
  at org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:89)
  at org.apache.spark.sql.execution.CacheManager$$anonfun$cacheQuery$1.apply(CacheManager.scala:104)
  at org.apache.spark.sql.execution.CacheManager.writeLock(CacheManager.scala:68)
  at org.apache.spark.sql.execution.CacheManager.cacheQuery(CacheManager.scala:92)
  at org.apache.spark.sql.Dataset.persist(Dataset.scala:2603)
  at org.apache.spark.sql.Dataset.cache(Dataset.scala:2613)
  ... 48 elided

原因很简单,很容易解释(没有双关火花SQL的explain意图)。

spark.readStream.text("files")创建了一个所谓的流数据集

代码语言:javascript
复制
scala> val files = spark.readStream.text("files")
files: org.apache.spark.sql.DataFrame = [value: string]

scala> files.isStreaming
res2: Boolean = true

流数据集是Spark的结构化流的基础。

正如您在结构化流的快速示例中所读到的

然后使用start()启动流计算。

引用DataStreamWriter的开始的scaladoc

start():StreamingQuery启动流查询的执行,该查询将在新数据到达时不断地将结果输出到给定的路径。

因此,您必须使用start (或foreach)开始执行流查询。你已经知道了。

But...there是结构化流中的无支援行动

此外,还有一些Dataset方法无法在流数据集上工作。它们是将立即运行查询和返回结果的操作,这在流数据集中没有意义。 如果尝试这些操作,您将看到类似于“流式DataFrames/Datasets不支持XYZ操作”的AnalysisException。

看起来很眼熟,不是吗?

在不受支持的操作列表中,cache而不是,但这是因为它被忽略了(我报告了火花-20927来修复它)。

cache应该在列表中,因为它是做的,在Spark的CacheManager中注册查询之前,执行查询。

让我们深入到火种的深处SQL...hold你的呼吸..。

cache persist while persist 请求当前CacheManager缓存查询。

代码语言:javascript
复制
sparkSession.sharedState.cacheManager.cacheQuery(this)

缓存查询时,CacheManager 执行 执行它

代码语言:javascript
复制
sparkSession.sessionState.executePlan(planToCache).executedPlan

我们知道是不允许的,因为这样做是start (或foreach)的。

问题解决了!

票数 21
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/42062092

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档