首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >暂停并恢复KafkaConsumer中的SparkStreaming

暂停并恢复KafkaConsumer中的SparkStreaming
EN

Stack Overflow用户
提问于 2020-06-17 16:46:06
回答 1查看 409关注 0票数 0

:)

我在一种(奇怪的)情况下结束了自己的工作,简单地说,我不想使用任何来自Kafka的新记录,所以暂停主题中所有分区的sparkStreaming消费(InputDStreamConsumerRecord),执行一些操作,最后继续使用记录。

首先..。,这有可能吗?

我一直在尝试这样的方法:

代码语言:javascript
复制
var consumer: KafkaConsumer[String, String] = _    
consumer = new KafkaConsumer[String, String](properties)    
consumer.subscribe(java.util.Arrays.asList(topicName))

consumer.pause(consumer.assignment())
...
consumer.resume(consumer.assignment())

但我得到了这个

代码语言:javascript
复制
println(s"Assigned partitions: $consumer.assignment()") --> []
println(s"Paused partitions: ${consumer.paused()}") --> []
println(s"Partitions for: ${consumer.partitionsFor(topicNAme)}") --> [Partition(topic=topicAAA, partition=0, leader=1, replicas=[1,2,3], partition=1, ... ]

任何帮助,以了解我错过了什么,为什么我要得到空的结果,当消费者有分区分配的时候,将受到欢迎!

版本:Kafka: 0.10火花:2.3.0Scala: 2.11.8

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2020-06-17 17:11:01

是的,可以在代码中添加检查并传递持久存储(本地磁盘、S3、HDFS)路径。

每当你开始/继续你的工作,它将收集卡夫卡消费者组的信息与消费者抵消,从检查点,并开始处理从哪里停止。

代码语言:javascript
复制
val context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext _)

火花检查因此,每当您用新代码重新启动作业时,它就会

  1. 读取和处理序列化数据
  2. ,如果您的Spark
  3. 从最新代码处理新数据中出现任何代码更改,则清除缓存的DAG阶段。

现在,从磁盘读取只是一个一次操作所需的斯帕克加载卡夫卡偏移量,DAG和旧的不完全处理的数据。

一旦完成,它将始终在默认或指定的检查点间隔时继续将数据保存到磁盘。

星火流提供了指定Kafka组id的选项,但Spark结构化流没有。

票数 2
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/62434153

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档