在我们的应用程序中,我们使用kafka组件在手动提交模式下使用kafka主题的消息。在随后的进程中遇到异常(外部服务调用失败-抛出RetryableException)时,我们在ThrottlingExceptionRoutePolicy中连接以打开电路并根据HalfOpenHandler重试电路关闭,遇到RetryableException。
ThrottlingExceptionRoutePolicy { throttledException - RetryableException,failureThreshold - 1,failureWindow =1 mins halfOpenAfter -x mins keepOpen = false }
from("kafka:topicName").routePolicy(exceptionPolicy).to(anotherProcess);
当我们运行单个消费者每个吊舱,电路打开/关闭是按照预期的工作。但是在多个使用者的情况下,电路是打开的,但电路关闭是不尝试的。
查看ThrottlingExceptionRoutePolicy代码:
第一和第二使用者线程都在等待获得锁。
7.取消policy.halfOpenTimer ( h2 ) 7.b,从而取消h2半开计划任务7.c调用this.calculateState()
7.c.1检查状态是否打开,如果elapsedTime >=阈值X,则调用半开处理程序,但elapsedTime为currentTs (t1+x) - openedAt(t2),因此条件永远不为真。所以半开处理程序永远不会被调用。
这种情况可能发生在并行消费者> 1的任何骆驼路线上。
有人遇到过类似的问题吗?
发布于 2021-08-04 20:08:42
它发生在一个concurrentConsumers=1以及concurrencyLimit: 50显然定时器从来没有关闭电路。这是最后一次被记录的声明。2021-08-04 15:56:59.473调试99845
而且,ThrottlingExceptionRoutePolicy.failures只是在failureWindow之外保持增量。
protected boolean isThresholdExceeded() {
boolean output = false;
this.logState();
if (this.failures.get() >= this.failureThreshold && this.lastFailure >= System.currentTimeMillis() - this.failureWindow) {
output = true;
}
return output;
}只要失败>= failureThreshold (在某一时刻)和lastFailure发生在failureWindow中,“输出”将为真。除非调用closeCircuit,否则不会设置失败。那是个虫子。
https://stackoverflow.com/questions/64144516
复制相似问题