首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >如何使用Sarama Go Kafka消费者从最新的抵消中消费

如何使用Sarama Go Kafka消费者从最新的抵消中消费
EN

Stack Overflow用户
提问于 2019-07-16 13:33:48
回答 1查看 5.2K关注 0票数 3

我有三个问题:

  1. “最古老的抵消”的含义是什么?最老的偏移量不意味着偏移量为0?

// OffsetOldest表示代理上可用的最古老的偏移量 //分区。 OffsetOldest int64 = -2

  1. SupposingA。在同一台机器上运行的三个经纪人 B.消费者群体只有一条消费者线索 消费者吐露OffsetOldest旗子。 如果使用者线程重新启动,已经产生了100个msgs,并且当前使用者线程已经消耗了90 msgs.So,那么这个使用者将从哪一个偏移开始消费?是91还是0?
  2. 在下面的代码中,它似乎每次启动使用者时都会重新使用消息。但实际上这种事总是会发生的。为什么重消费只是在重新启动之后发生了几次(不是全部)? ConsumeClaim(会话sarama.ConsumerGroupSession,索赔sarama.ConsumerGroupClaim)错误{消息:= range claim.Messages() { this.handler( message ) session.MarkMessage(message,"") }返回nil } ctx := context.Background() conf := sarama.NewConfig() conf.Version = sarama.V2___0 conf.Consumer.Offsets.Initial = sarama.OffsetOldest conf.Consumer.Return.Errors =真正的使用者,如果err != nil { sarama.NewConsumerGroup(strings.Split(app.Config().KafkaBrokers,(“NewConsumerGroupFromClient(%s)错误:%v",groupId,err)返回}
EN

回答 1

Stack Overflow用户

发布于 2019-07-16 14:02:25

  1. 不是的。应用保留策略时,将从主题中删除旧消息。因此,最早的偏移量可能不是第一次偏移量(即0)。
  2. 这取决于您的配置。基本上,您有三种选择:
代码语言:javascript
复制
- Start consuming from the `earliest` offset
- Start consuming from the `latest` offset
- Start consuming from a specific offset 

  1. 你必须使用sarama.OffsetOldest。从文档

const ( // OffsetNewest )表示日志头偏移量,即将//分配给将生成给分区的下一条消息的偏移量。您可以将其发送到客户端的GetOffset方法以获取此偏移量,或者当//调用ConsumePartition开始使用新消息时。OffsetNewest int64 = -1 // OffsetOldest表示代理上可用于//分区的最古老的偏移量。您可以将其发送到客户端的GetOffset方法以获取此//偏移量,或者在调用ConsumePartition以从代理上仍然可用的//最老偏移量开始消费时。OffsetOldest int64 = -2 )

票数 4
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/57058504

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档