我在和卡夫卡玩,试着抓住它。我们需要做的一件事是运行负载平衡的服务器--冗余/高可用性/等等--然后彼此独立地重新启动。应该很简单。
不过,我所发现的有些奇怪。如果我运行的是一个处理一组消息的Kafka消费者,然后在处理消息时,我将第二个消费者添加到同一个消费者组中,我会多次获得整个消息集,而不仅仅是一次。
例如,下面是来自这样一个运行的日志文件:https://gist.github.com/sazzer/5604d0652ff14533654c8b543942c10e
这是使用两个主题-卡夫卡-现场和卡夫卡-散装。每个主题有两个分区-每个消费者一个。然后测试将20条消息添加到大容量队列中,然后将10条消息添加到活动队列中。(这实际上是在测试其他东西,但我只是重复使用了设置)
从日志中,您将看到每条消息总共处理3次,而不是像我预期的那样只处理一次。
这方面的代码如下:https://gist.github.com/sazzer/c67e4db9a04aac8c0d46bbc21188775d
这是使用Spring和Spring,而且--除了这个案例--它只是起作用了。
当一个新的消费者出现时,我是不是遗漏了什么来阻止它重播所有的消息?还是这只是我要处理的事?
干杯
发布于 2019-01-21 15:27:14
尝试将ConsumerConfig.ENABLE_AUTO_COMMIT设置为false。
侦听器容器将在每一批记录处理完之后提交偏移集,而不是依赖客户端进行提交;当重新平衡发生时,它还将提交任何挂起的偏移量;您还可以将AckMode设置为RECORD,并将处理每个记录的偏移量。
您还可以手动将分区分配给实例,并且不使用组管理进行分配。
https://stackoverflow.com/questions/54287347
复制相似问题