我使用spring框架和kafka,它有3个代理集群。我发现消费者没有消费一些消息(假设所有发送的消息之间有0.01%),所以在生产者代码中,我记录了api返回的消息偏移量:
ListenableFuture<SendResult<String, Message>> future = messageTemplate.sendDefault(id, message);
SendResult<String, Message> sendResult = future.get();
String offset = sendResult.getRecordMetadata().offset();我使用return offset查询所有分区的kafka topic,但是没有找到该消息(我测试了消费者使用的和他们在kafka中的消息相关的其他偏移量),问题是什么,如何确保该消息发送到kafka??
我还在producer中使用了messageTemplate.flush();
发布于 2019-12-29 17:05:04
我发现,当Kafka broker的topic leader关闭时,Kafka将重新平衡自己,另一个broker成为该分区的领导者,如果ack配置未设置为all,则此过程中有可能丢失一些数据。因此,将配置更改为
ack=all此外,如果同步副本中的最小值小于2,则有可能丢失数据,因此至少将其设置为2。
min.insync.replicas = 2https://stackoverflow.com/questions/56170604
复制相似问题