我正在开发一个模块,在这个模块中,它使用来自Kafka主题的消息并发布到下游系统。在下游系统不可用的情况下,消费者不要确认Kakfa消息。因此,当我的消费者在下游系统不可用时收到消息时,kakfa的偏移量将不会被提交。但如果我在下游系统启动后收到新消息,并且当我确认该消息时,最新的偏移量将被提交,消费者将永远不会收到没有偏移量提交的主题中的那些消息。
也就是说,假设我的消费者被消耗到偏移量4。当下游不可用时,消费者收到两条消息,因此我的消费者没有提交偏移量。因此,toipc中的消息数量现在是6,但偏移量仍然是4。现在下游系统可用,消费者收到新消息(第7条消息)。由于没有来自下游的问题,消费者确认第7条消息,主题的偏移量将设置为7。
有没有办法让我的消费者在收到第7条消息之前就能收到第5条和第6条消息?我在实现中使用了spring cloud stream。
发布于 2019-08-02 00:09:18
参见this answer。
您需要一个SeekToCurrentErrorHandler并抛出一个异常,以便重置偏移量。
https://stackoverflow.com/questions/57311470
复制相似问题