我使用Kafka 0.8.2从AdExchange接收数据,然后使用Spark Streaming 1.4.1将数据存储到MongoDB。
我的问题是当我重新启动我的Spark Streaming作业时,比如更新新版本,修复bug,添加新功能。它将继续阅读最新的offset of kafka,然后我将失去数据AdX推动卡夫卡在重新启动作业。
我尝试了一些类似于auto.offset.reset -> smallest的东西,但是它最后会从0 ->接收到,那时数据是巨大的,在db中是重复的。
我还尝试将特定的group.id和consumer.id设置为Spark,但它是相同的。
如何将最新的offset火花保存到zookeeper或kafka,然后可以将其读回最新的offset
发布于 2015-08-06 06:55:56
createDirectStream函数的一个构造函数可以获得一个映射,该映射将分区id作为键,并将您开始使用的偏移量作为值。
看看api:http://spark.apache.org/docs/2.2.0/api/java/org/apache/spark/streaming/kafka/KafkaUtils.html,我通常所说的地图: fromOffsets
您可以将数据插入到地图:
startOffsetsMap.put(TopicAndPartition(topicName,partitionId), startOffset)并在创建直接流时使用它:
KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder, (String, String)](
streamingContext, kafkaParams, startOffsetsMap, messageHandler(_))每次迭代之后,您可以使用以下方法获得已处理的偏移量:
rdd.asInstanceOf[HasOffsetRanges].offsetRanges您可以在下一次迭代中使用这些数据来构造fromOffsets映射。
您可以在页面末尾看到完整的代码和用法:https://spark.apache.org/docs/latest/streaming-kafka-integration.html
发布于 2015-08-10 17:33:18
为了补充的答案,如果您真的想使用ZK作为存储和加载您的偏移地图的地方,您可以。
但是,由于您的结果没有输出到ZK,除非您的输出操作是幂等的(听起来不是这样),否则您将无法获得可靠的语义。
如果可以将结果与单个原子操作中的偏移一起存储在同一个文档中,那么这可能对您更好。
发布于 2016-06-29 20:09:36
下面是一些代码,您可以使用这些代码在ZK http://geeks.aretotally.in/spark-streaming-kafka-direct-api-store-offsets-in-zk/中存储偏移量
在调用KafkaUtils.createDirectStream:http://geeks.aretotally.in/spark-streaming-direct-api-reusing-offset-from-zookeeper/时,可以使用以下代码来使用偏移量
https://stackoverflow.com/questions/31846654
复制相似问题