当我手动确认消费者应用程序中的偏移量时,我希望消费者可以自动重新获得未确认消息,但我不能成功。
这是我的配置文件:
cloud:
stream:
kafka:
binder:
brokers: ****:****
za-nodes: ****
replication-factor: 1
bindings:
input:
consumer:
auto-commit-offset: false
auto-commit-on-error: false
reset-offsets: true发布于 2017-01-25 15:18:14
现在似乎不再使用https://github.com/spring-cloud/spring-cloud-stream-binder-kafka/issues/48#issuecomment-273111200属性resetOffsets。
默认情况下,使用者的auto.offset.reset将被设置为latest (即,如果您使用,则不要使用显式地使用spring.cloud.stream.bindings.input.group为使用者绑定设置组)
发布于 2017-01-25 22:04:34
Kafka没有能力确认单个消息-一旦保存了组/分区的偏移量,所有先前的消息都被认为是“已确认的”-手动确认允许的是在例如消息被异步处理的情况下推迟偏移量提交过程(并且自动确认将在消息被实际处理之前保存偏移量)。
目前不支持resetOffsets (这是我们在迁移到新的消费者客户端后基本上放弃的一个功能-请参阅https://github.com/spring-cloud/spring-cloud-stream-binder-kafka/issues/67),但不幸的是,文档没有反映这一点。
https://stackoverflow.com/questions/41845139
复制相似问题