首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >卡夫卡消费者:如何在Go Sarama中使用特定的偏移量

卡夫卡消费者:如何在Go Sarama中使用特定的偏移量
EN

Stack Overflow用户
提问于 2020-05-04 13:12:38
回答 1查看 2.6K关注 0票数 3

最近,我开始和卡夫卡一起学习。我正在做的这个项目使用了萨拉马。

对于阅读消息,我使用ConsumerGroup

如果foo返回false,我需要在一段时间后再读取该消息。这是如何做到的呢?

代码语言:javascript
复制
func (consumer *Consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {

    for message := range claim.Messages() {

            if ok := foo(message); ok {
                session.MarkMessage(message, "")
            } else {
                // ???
            }

    }

    return nil
}
EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2020-05-04 13:46:55

通过在消费者组的Setup()回调中包括以下内容,可以将消费组的偏移量重置为旧的偏移量:

代码语言:javascript
复制
func (e myConsumerGroup) Setup(sess sarama.ConsumerGroupSession) error {
    sess.ResetOffset(topic, partition, offset, "")

    return nil
}

您还可以通过控制台实现相同的目标:

代码语言:javascript
复制
kafka-consumer-groups \
    --bootstrap-server localhost:9092 \
    --group my-consumer-group \
    --topic myTopicName \
    --reset-offsets \
    --to-offfset 100 \
    --execute
票数 4
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/61593064

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档