首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >如何保存火种消耗到ZK或Kafka的最新偏移量,并在重新启动后可以读取

如何保存火种消耗到ZK或Kafka的最新偏移量,并在重新启动后可以读取
EN

Stack Overflow用户
提问于 2015-08-06 04:04:19
回答 4查看 16.1K关注 0票数 14

我使用Kafka 0.8.2从AdExchange接收数据,然后使用Spark Streaming 1.4.1将数据存储到MongoDB

我的问题是当我重新启动我的Spark Streaming作业时,比如更新新版本,修复bug,添加新功能。它将继续阅读最新的offset of kafka,然后我将失去数据AdX推动卡夫卡在重新启动作业。

我尝试了一些类似于auto.offset.reset -> smallest的东西,但是它最后会从0 ->接收到,那时数据是巨大的,在db中是重复的。

我还尝试将特定的group.idconsumer.id设置为Spark,但它是相同的。

如何将最新的offset火花保存到zookeeperkafka,然后可以将其读回最新的offset

EN

回答 4

Stack Overflow用户

回答已采纳

发布于 2015-08-06 06:55:56

createDirectStream函数的一个构造函数可以获得一个映射,该映射将分区id作为键,并将您开始使用的偏移量作为值。

看看api:http://spark.apache.org/docs/2.2.0/api/java/org/apache/spark/streaming/kafka/KafkaUtils.html,我通常所说的地图: fromOffsets

您可以将数据插入到地图:

代码语言:javascript
复制
startOffsetsMap.put(TopicAndPartition(topicName,partitionId), startOffset)

并在创建直接流时使用它:

代码语言:javascript
复制
KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder, (String, String)](
                streamingContext, kafkaParams, startOffsetsMap, messageHandler(_))

每次迭代之后,您可以使用以下方法获得已处理的偏移量:

代码语言:javascript
复制
rdd.asInstanceOf[HasOffsetRanges].offsetRanges

您可以在下一次迭代中使用这些数据来构造fromOffsets映射。

您可以在页面末尾看到完整的代码和用法:https://spark.apache.org/docs/latest/streaming-kafka-integration.html

票数 15
EN

Stack Overflow用户

发布于 2015-08-10 17:33:18

为了补充的答案,如果您真的想使用ZK作为存储和加载您的偏移地图的地方,您可以。

但是,由于您的结果没有输出到ZK,除非您的输出操作是幂等的(听起来不是这样),否则您将无法获得可靠的语义。

如果可以将结果与单个原子操作中的偏移一起存储在同一个文档中,那么这可能对您更好。

有关更多细节,请参见https://www.youtube.com/watch?v=fXnNEq1v3VA

票数 2
EN

Stack Overflow用户

发布于 2016-06-29 20:09:36

下面是一些代码,您可以使用这些代码在ZK http://geeks.aretotally.in/spark-streaming-kafka-direct-api-store-offsets-in-zk/中存储偏移量

在调用KafkaUtils.createDirectStream:http://geeks.aretotally.in/spark-streaming-direct-api-reusing-offset-from-zookeeper/时,可以使用以下代码来使用偏移量

票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/31846654

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档