这是来自KafkaMessageListenerContainer的代码,在什么情况下可能发生InterruptedException,可能会被此代码抛出,突然出现在我的应用程序日志中,它消耗了来自主题和进程的消息,然后确认消息,请参见以下错误消息
Interrupted while queuing ack for
@Override
public void acknowledge() {
try {
if (ListenerConsumer.this.autoCommit) {
throw new IllegalStateException("Manual acks are not allowed when auto commit is used");
}
ListenerConsumer.this.acks.put(this.record);
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
**throw new KafkaException("Interrupted while queuing ack for " + this.record, e);**
}
if (this.immediate) {
ListenerConsumer.this.consumer.wakeup();
}
}发布于 2018-03-21 00:37:48
您正在查看哪个版本;该代码不再存在(因为1.3 -当前版本是2.1.4)。
在任何情况下,线程的任何中断(例如,关闭任务执行器)都会导致该问题。
https://stackoverflow.com/questions/49386130
复制相似问题