首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >无法使用spark结构流计算文档数量

无法使用spark结构流计算文档数量
EN

Stack Overflow用户
提问于 2020-04-14 19:11:19
回答 1查看 52关注 0票数 1

我正在尝试使用couchbase作为spark structured使用spark connector的流媒体来源。

代码语言:javascript
复制
val records = spark.readStream
.format(“com.couchbase.spark.sql”).schema(schema)
.load()

我有一个问题

代码语言:javascript
复制
records
.groupBy(“type”)
.count()
.writeStream
.outputMode(“complete”)
.format(“console”)
.start()
.awaitTermination()

对于这个查询,我没有得到正确的输出。我的查询输出表如下

代码语言:javascript
复制
Batch: 0
20/04/14 14:28:00 INFO CodeGenerator: Code generated in 10.538654 ms
20/04/14 14:28:00 INFO WriteToDataSourceV2Exec: Data source writer org.apache.spark.sql.execution.streaming.sources.MicroBatchWriter@17fe0ec7 committed.
±-------±----+
|type | count|
±-------±----+
±-------±----+

但是,如果我使用couchbase以非流的方式获取文档。喜欢

代码语言:javascript
复制
val cdr = spark.read.couchbase(EqualTo(“type”, “cdr”))
cdr.count()

模式被正确地推断用于该非流操作,并且对于结构化流操作也使用相同的模式。

代码语言:javascript
复制
INFO N1QLRelation: Inferred schema is StructType(StructField(META_ID,StringType,true), StructField(_class,StringType,true), StructField(accountId,StringType,true), 

给出正确的输出。(count= 28)。

请让我知道为什么这不适用于结构化流媒体。

EN

回答 1

Stack Overflow用户

发布于 2020-04-19 15:28:42

这可能是因为您只流式传输从现在开始发生更改的内容,而不是过去的事件。如果你想“从头开始”流式传输所有内容,你需要指定它。

此博客帖子中显示了示例:https://blog.couchbase.com/couchbase-spark-connector-2-0-0-released/

基本上,在您的流中,需要指定以下行

代码语言:javascript
复制
  .couchbaseStream(from = FromBeginning, to = ToInfinity)
票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/61206324

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档