首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >电火花流卡夫卡偏移管理

电火花流卡夫卡偏移管理
EN

Stack Overflow用户
提问于 2018-04-12 13:56:01
回答 1查看 6K关注 0票数 2

我一直在做星火流工作,这些工作是通过卡夫卡( kafka )来消费和生产数据的。我使用了directDstream,所以我不得不自己管理偏移量,我们用redis编写和读取offsets.Now有一个问题,当我启动我的客户端时,我的客户需要从redis获得偏移量,而不是使用itself.how中存在的偏移量来显示我编写的代码?现在我已经编写了下面的代码:

代码语言:javascript
复制
   kafka_stream = KafkaUtils.createDirectStream(
    ssc,
    topics=[config.CONSUME_TOPIC, ],
    kafkaParams={"bootstrap.servers": config.CONSUME_BROKERS,
                 "auto.offset.reset": "largest"},
    fromOffsets=read_offset_range(config.OFFSET_KEY))

但是我认为fromOffsets是当星火流客户端开始攻击时的价值(来自redis),而不是在它的running.thank you提供帮助的时候。

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2018-04-13 10:44:08

如果我正确地理解你,你需要手动设置你的偏移量。我就是这样做的:

代码语言:javascript
复制
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
from pyspark.streaming.kafka import TopicAndPartition

stream = StreamingContext(sc, 120) # 120 second window

kafkaParams = {"metadata.broker.list":"1:667,2:6667,3:6667"}
kafkaParams["auto.offset.reset"] = "smallest"
kafkaParams["enable.auto.commit"] = "false"

topic = "xyz"
topicPartion = TopicAndPartition(topic, 0)
fromOffset = {topicPartion: long(PUT NUMERIC OFFSET HERE)}

kafka_stream = KafkaUtils.createDirectStream(stream, [topic], kafkaParams, fromOffsets = fromOffset)
票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/49798501

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档