:)
我在一种(奇怪的)情况下结束了自己的工作,简单地说,我不想使用任何来自Kafka的新记录,所以暂停主题中所有分区的sparkStreaming消费(InputDStreamConsumerRecord),执行一些操作,最后继续使用记录。
首先..。,这有可能吗?
我一直在尝试这样的方法:
var consumer: KafkaConsumer[String, String] = _
consumer = new KafkaConsumer[String, String](properties)
consumer.subscribe(java.util.Arrays.asList(topicName))
consumer.pause(consumer.assignment())
...
consumer.resume(consumer.assignment())但我得到了这个
println(s"Assigned partitions: $consumer.assignment()") --> []
println(s"Paused partitions: ${consumer.paused()}") --> []
println(s"Partitions for: ${consumer.partitionsFor(topicNAme)}") --> [Partition(topic=topicAAA, partition=0, leader=1, replicas=[1,2,3], partition=1, ... ]任何帮助,以了解我错过了什么,为什么我要得到空的结果,当消费者有分区分配的时候,将受到欢迎!
版本:Kafka: 0.10火花:2.3.0Scala: 2.11.8
发布于 2020-06-17 17:11:01
是的,可以在代码中添加检查并传递持久存储(本地磁盘、S3、HDFS)路径。
每当你开始/继续你的工作,它将收集卡夫卡消费者组的信息与消费者抵消,从检查点,并开始处理从哪里停止。
val context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext _)火花检查因此,每当您用新代码重新启动作业时,它就会
现在,从磁盘读取只是一个一次操作所需的斯帕克加载卡夫卡偏移量,DAG和旧的不完全处理的数据。
一旦完成,它将始终在默认或指定的检查点间隔时继续将数据保存到磁盘。
星火流提供了指定Kafka组id的选项,但Spark结构化流没有。
https://stackoverflow.com/questions/62434153
复制相似问题