最近,我开始和卡夫卡一起学习。我正在做的这个项目使用了萨拉马。
对于阅读消息,我使用ConsumerGroup。
如果foo返回false,我需要在一段时间后再读取该消息。这是如何做到的呢?
func (consumer *Consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
for message := range claim.Messages() {
if ok := foo(message); ok {
session.MarkMessage(message, "")
} else {
// ???
}
}
return nil
}发布于 2020-05-04 13:46:55
通过在消费者组的Setup()回调中包括以下内容,可以将消费组的偏移量重置为旧的偏移量:
func (e myConsumerGroup) Setup(sess sarama.ConsumerGroupSession) error {
sess.ResetOffset(topic, partition, offset, "")
return nil
}您还可以通过控制台实现相同的目标:
kafka-consumer-groups \
--bootstrap-server localhost:9092 \
--group my-consumer-group \
--topic myTopicName \
--reset-offsets \
--to-offfset 100 \
--executehttps://stackoverflow.com/questions/61593064
复制相似问题