我正在尝试使用couchbase作为spark structured使用spark connector的流媒体来源。
val records = spark.readStream
.format(“com.couchbase.spark.sql”).schema(schema)
.load()我有一个问题
records
.groupBy(“type”)
.count()
.writeStream
.outputMode(“complete”)
.format(“console”)
.start()
.awaitTermination()对于这个查询,我没有得到正确的输出。我的查询输出表如下
Batch: 0
20/04/14 14:28:00 INFO CodeGenerator: Code generated in 10.538654 ms
20/04/14 14:28:00 INFO WriteToDataSourceV2Exec: Data source writer org.apache.spark.sql.execution.streaming.sources.MicroBatchWriter@17fe0ec7 committed.
±-------±----+
|type | count|
±-------±----+
±-------±----+但是,如果我使用couchbase以非流的方式获取文档。喜欢
val cdr = spark.read.couchbase(EqualTo(“type”, “cdr”))
cdr.count()模式被正确地推断用于该非流操作,并且对于结构化流操作也使用相同的模式。
INFO N1QLRelation: Inferred schema is StructType(StructField(META_ID,StringType,true), StructField(_class,StringType,true), StructField(accountId,StringType,true), 给出正确的输出。(count= 28)。
请让我知道为什么这不适用于结构化流媒体。
发布于 2020-04-19 15:28:42
这可能是因为您只流式传输从现在开始发生更改的内容,而不是过去的事件。如果你想“从头开始”流式传输所有内容,你需要指定它。
此博客帖子中显示了示例:https://blog.couchbase.com/couchbase-spark-connector-2-0-0-released/
基本上,在您的流中,需要指定以下行
.couchbaseStream(from = FromBeginning, to = ToInfinity)https://stackoverflow.com/questions/61206324
复制相似问题