首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >星星之火--卡夫卡的结构化流--不尊重startingOffset=“最早”

星星之火--卡夫卡的结构化流--不尊重startingOffset=“最早”
EN

Stack Overflow用户
提问于 2019-06-19 02:31:10
回答 2查看 6.7K关注 0票数 8

我已经设置了星火结构化流(Spark2.3.2)来阅读Kafka (2.0.0)。如果消息在星火流作业启动前进入主题,则无法从主题开始使用。这种预期的星火流行为是否忽略了最初运行Stream作业之前生成的Kafka消息(即使使用.option("stratingOffsets",“最早”))?

复制步骤

  1. 在开始流作业之前,创建test主题(单个代理、单个分区)并生成主题的消息(在我的示例中有3条消息)。
  2. 使用以下命令启动spark:spark-shell --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.3.2.3.1.0.0-78 --repositories http://repo.hortonworks.com/content/repositories/releases/
  3. 执行下面的星火scala代码。
代码语言:javascript
复制
// Local
val df = spark.readStream.format("kafka")
  .option("kafka.bootstrap.servers", "localhost:9097")
  .option("failOnDataLoss","false")
  .option("stratingOffsets","earliest")
  .option("subscribe", "test")
  .load()

// Sink Console
val ds = df.writeStream.format("console").queryName("Write to console")
  .trigger(org.apache.spark.sql.streaming.Trigger.ProcessingTime("10 second"))
  .start()

预期产出与实际产出

我希望流从offset=1开始,但是它从offset=3开始读取。

我可以看到火花流处理在启动流作业后生成的消息。

这是否是星火流的预期行为,它忽略了最初运行Stream作业之前生成的Kafka消息(即使是使用.option("stratingOffsets","earliest"))?

代码语言:javascript
复制
2019-06-18 21:22:57 INFO  AppInfoParser:109 - Kafka version : 2.0.0.3.1.0.0-78
2019-06-18 21:22:57 INFO  AppInfoParser:110 - Kafka commitId : 0f47b27cde30d177
2019-06-18 21:22:57 INFO  MicroBatchExecution:54 - Starting new streaming query.
2019-06-18 21:22:57 INFO  Metadata:273 - Cluster ID: LqofSZfjTu29BhZm6hsgsg
2019-06-18 21:22:57 INFO  AbstractCoordinator:677 - [Consumer clientId=consumer-2, groupId=spark-kafka-source-e948eee9-3024-4f14-bcb8-75b80d43cbb1--181544888-driver-0] Discovered group coordinator localhost:9097 (id: 2147483647 rack: null)
2019-06-18 21:22:57 INFO  ConsumerCoordinator:462 - [Consumer clientId=consumer-2, groupId=spark-kafka-source-e948eee9-3024-4f14-bcb8-75b80d43cbb1--181544888-driver-0] Revoking previously assigned partitions []
2019-06-18 21:22:57 INFO  AbstractCoordinator:509 - [Consumer clientId=consumer-2, groupId=spark-kafka-source-e948eee9-3024-4f14-bcb8-75b80d43cbb1--181544888-driver-0] (Re-)joining group
2019-06-18 21:22:57 INFO  AbstractCoordinator:473 - [Consumer clientId=consumer-2, groupId=spark-kafka-source-e948eee9-3024-4f14-bcb8-75b80d43cbb1--181544888-driver-0] Successfully joined group with generation 1
2019-06-18 21:22:57 INFO  ConsumerCoordinator:280 - [Consumer clientId=consumer-2, groupId=spark-kafka-source-e948eee9-3024-4f14-bcb8-75b80d43cbb1--181544888-driver-0] Setting newly assigned partitions [test-0]
2019-06-18 21:22:57 INFO  Fetcher:583 - [Consumer clientId=consumer-2, groupId=spark-kafka-source-e948eee9-3024-4f14-bcb8-75b80d43cbb1--181544888-driver-0] Resetting offset for partition test-0 to offset 3.
2019-06-18 21:22:58 INFO  KafkaSource:54 - Initial offsets: {"test":{"0":3}}
2019-06-18 21:22:58 INFO  Fetcher:583 - [Consumer clientId=consumer-2, groupId=spark-kafka-source-e948eee9-3024-4f14-bcb8-75b80d43cbb1--181544888-driver-0] Resetting offset for partition test-0 to offset 3.
2019-06-18 21:22:58 INFO  MicroBatchExecution:54 - Committed offsets for batch 0. Metadata OffsetSeqMetadata(0,1560910978083,Map(spark.sql.shuffle.partitions -> 200, spark.sql.streaming.stateStore.providerClass -> org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider))
2019-06-18 21:22:58 INFO  KafkaSource:54 - GetBatch called with start = None, end = {"test":{"0":3}}

火花分批模式

我能够确认批处理模式从一开始就读取所以卡夫卡保留配置没有问题。

代码语言:javascript
复制
val df = spark
  .read
  .format("kafka")
  .option("kafka.bootstrap.servers", "localhost:9097")
  .option("subscribe", "test")
  .load()

df.count // Long = 3
EN

回答 2

Stack Overflow用户

回答已采纳

发布于 2019-06-27 16:57:32

哈哈,这是一个简单的错误:"stratingOffsets“应该是"startingOffsets”

票数 4
EN

Stack Overflow用户

发布于 2019-06-19 05:16:01

你可以用两种方式来做。将数据从kafka加载到流数据,或者从kafka加载数据到静态数据(用于测试)。

我想你没有看到这些数据是因为团体身份。卡夫卡将致力于消费者群体和抵消在一个内部的话题。确保每个读取的组名称都是唯一的。

以下是两种选择。

选项1:从kafka读取数据到流数据

代码语言:javascript
复制
// spark streaming with kafka 

import org.apache.spark.sql.streaming.ProcessingTime

val ds1 = spark.readStream.format("kafka")
.option("kafka.bootstrap.servers","app01.app.test.net:9097,app02.app.test.net:9097")
.option("subscribe", "kafka-testing-topic")
.option("kafka.security.protocol", "SASL_PLAINTEXT")
.option("startingOffsets","earliest")
.option("maxOffsetsPerTrigger","6000")
.load()

val ds2 = ds1.select(from_json($"value".cast(StringType), dataSchema).as("data")).select("data.*")
val ds3 = ds2.groupBy("TABLE_NAME").count()
ds3.writeStream
.trigger(ProcessingTime("10 seconds"))
.queryName("query1").format("console")
.outputMode("complete")
.start()
.awaitTermination()

选项2:从kafka读取数据到静态数据(为了测试,它将从一开始加载)

代码语言:javascript
复制
// Subscribe to 1 topic defaults to the earliest and latest offsets
val ds1 = spark.read.format("kafka")
.option("kafka.bootstrap.servers","app01.app.test.net:9097,app02.app.test.net:9097")
.option("subscribe", "kafka-testing-topic")
.option("kafka.security.protocol", "SASL_PLAINTEXT")
.option("spark.streaming.kafka.consumer.cache.enabled","false")
.load()

val ds2 = ds1.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)","topic","partition","offset","timestamp")
val ds3 = ds2.select("value").rdd.map(x => x.toString)
ds3.count()
票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/56659273

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档