在我的反应式应用程序中,我使用了具有慢Subscriber的热Publisher。为了处理需求不足,我使用了onBackpressureBuffer,但可能出现的溢出错误有点可怕。
如何监控Flux.onBackpressureBuffer(maxSize)创建的队列中存在的元素数量?最好采用内置反应器metrics()方法。我使用的是Spring Boot +测微器,如果有什么不同的话。
发布于 2020-03-23 01:34:29
虽然我们没有在Reactor中找到一个简单的方法来解决这个问题,但是我们发现了一个有点"hacky“的方法。这就是:https://github.com/allegro/envoy-control/blob/master/envoy-control-core/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/utils/ReactorUtils.kt#L34
此函数测量各种Flux运算符的缓冲区大小。它不能保证在每个操作员上都能工作,但它在onBackpressureBuffer上进行了测试,结果是肯定的。它是用Kotlin编写的,但是将它移植到Java应该非常容易。
在使用onBackpressureBuffer的情况下,此代码的本质是将Subscription转换为Scannable,然后使用缓冲属性:
flux
.onBackressureBuffer(maxSize)
.doOnSubscribe { subscription ->
// ...
val queueSize = Scannable.from(subscription).scan(Scannable.Attr.BUFFERED)
// ...
}https://stackoverflow.com/questions/59462525
复制相似问题