首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >rxJava buffer(),用于响应反压力的时间

rxJava buffer(),用于响应反压力的时间
EN

Stack Overflow用户
提问于 2018-04-26 18:20:36
回答 4查看 1.1K关注 0票数 2

根据JavaDoc,不按时间操作的buffer操作符的版本尊重反压力:

http://reactivex.io/RxJava/2.x/javadoc/io/reactivex/Flowable.html#buffer-int-

但是,任何涉及基于时间的缓冲区的buffer版本都不支持背压,就像这个版本一样

http://reactivex.io/RxJava/2.x/javadoc/io/reactivex/Flowable.html#buffer-long-java.util.concurrent.TimeUnit-int-

我理解这是因为一旦时间流逝,你就不能停止它,比如interval运算符,因为同样的原因,它也不支持背压。

我想要的是一个基于大小和时间的缓冲操作符,并通过将背压信号传播到上游和计时生成器来完全支持背压,如下所示:

代码语言:javascript
复制
someFlowable()
.buffer(
     Flowable.interval(1, SECONDS).onBackpressureDrop(),
     10
);

所以现在我可以在背压信号上去掉滴答声。

这是目前在rxJava2中可以实现的吗?Project-Reactor怎么样?

EN

回答 4

Stack Overflow用户

发布于 2019-03-13 15:02:21

我最近遇到了这个问题,下面是我的实现。它可以像这样使用:

代码语言:javascript
复制
    Flowable<List<T>> bufferedFlow = (some flowable of T)
                              .compose(new BufferTransformer(1, TimeUnit.MILLISECONDS, 8))

它支持通过您指定的计数进行反压。

下面是实现:https://gist.github.com/driventokill/c49f86fb0cc182994ef423a70e793a2d

票数 2
EN

Stack Overflow用户

发布于 2019-08-02 06:24:52

当我使用DisposableSubscriber作为订阅者时,我遇到了来自https://stackoverflow.com/a/55136139/6719538的解决方案的问题,据我所知,这个转换器不考虑来自下游订阅者的调用Suscription#request (它可能会使它们溢出)。我创建了在production - BufferTransformerHonorableToBackpressure.java中测试过的版本。fang-yang -非常尊重idea。

票数 1
EN

Stack Overflow用户

发布于 2018-11-29 01:46:09

已经有一段时间了,但我又看了一遍,不知何故我突然意识到:

代码语言:javascript
复制
public static <T> FlowableTransformer<T, List<T>> buffer(
    int n, long period, TimeUnit unit)
{
    return o ->
        o.groupBy(__ -> 1)
         .concatMapMaybe(
             gf ->
                 gf.take(n)
                   .take(period, SECONDS)
                   .toList()
                   .filter(l -> !l.isEmpty())
         );
}

基本上就是我描述的那样。如果我是正确的,这是完全反压的,并且将缓冲n个项目,或者在指定时间之后(如果没有收集足够的项目)

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/50040510

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档