根据JavaDoc,不按时间操作的buffer操作符的版本尊重反压力:
http://reactivex.io/RxJava/2.x/javadoc/io/reactivex/Flowable.html#buffer-int-
但是,任何涉及基于时间的缓冲区的buffer版本都不支持背压,就像这个版本一样
http://reactivex.io/RxJava/2.x/javadoc/io/reactivex/Flowable.html#buffer-long-java.util.concurrent.TimeUnit-int-
我理解这是因为一旦时间流逝,你就不能停止它,比如interval运算符,因为同样的原因,它也不支持背压。
我想要的是一个基于大小和时间的缓冲操作符,并通过将背压信号传播到上游和计时生成器来完全支持背压,如下所示:
someFlowable()
.buffer(
Flowable.interval(1, SECONDS).onBackpressureDrop(),
10
);所以现在我可以在背压信号上去掉滴答声。
这是目前在rxJava2中可以实现的吗?Project-Reactor怎么样?
发布于 2019-03-13 15:02:21
我最近遇到了这个问题,下面是我的实现。它可以像这样使用:
Flowable<List<T>> bufferedFlow = (some flowable of T)
.compose(new BufferTransformer(1, TimeUnit.MILLISECONDS, 8))它支持通过您指定的计数进行反压。
下面是实现:https://gist.github.com/driventokill/c49f86fb0cc182994ef423a70e793a2d
发布于 2019-08-02 06:24:52
当我使用DisposableSubscriber作为订阅者时,我遇到了来自https://stackoverflow.com/a/55136139/6719538的解决方案的问题,据我所知,这个转换器不考虑来自下游订阅者的调用Suscription#request (它可能会使它们溢出)。我创建了在production - BufferTransformerHonorableToBackpressure.java中测试过的版本。fang-yang -非常尊重idea。
发布于 2018-11-29 01:46:09
已经有一段时间了,但我又看了一遍,不知何故我突然意识到:
public static <T> FlowableTransformer<T, List<T>> buffer(
int n, long period, TimeUnit unit)
{
return o ->
o.groupBy(__ -> 1)
.concatMapMaybe(
gf ->
gf.take(n)
.take(period, SECONDS)
.toList()
.filter(l -> !l.isEmpty())
);
}基本上就是我描述的那样。如果我是正确的,这是完全反压的,并且将缓冲n个项目,或者在指定时间之后(如果没有收集足够的项目)
https://stackoverflow.com/questions/50040510
复制相似问题