首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >Kotlin Flow onBackpressureDrop RxJava2模拟

Kotlin Flow onBackpressureDrop RxJava2模拟
EN

Stack Overflow用户
提问于 2020-01-25 15:24:25
回答 3查看 1.7K关注 0票数 5

在可流动的RxJava 2中,有不同的背压策略,其中最有趣的是:

  • 最新
  • 缓冲器
  • 丢弃

在整个Rx链中都受到尊重。

在Kotlin有Flow,它宣称它有背压支持。通过使用以下方法,我能够创建具有缓冲区和最新策略的流:

缓冲器:

代码语言:javascript
复制
observeFlow()
    .buffer(10)
    .collect { ... }

最新资料如下:

代码语言:javascript
复制
observeFlow()
    .conflate()
    .collect { ... }

这只是同一个缓冲区操作符上的快捷方式。

但我没能找到和DROP一样的东西。简而言之,当之前的值尚未处理时,DROP将删除流中的任何值。有了流程,我甚至不确定这是可能的。

考虑到本案:

代码语言:javascript
复制
observeFlow()
    .backpressureDrop() // non-existent operator, just for illustrative purposes
    .map { ... }
    .flatMapMerge { ... }
    .collect { ... }

因此,backpressureDrop应该尊重在流下面所做的任何工作,而操作员不知道下面发生了什么(没有RxJava订阅服务器中类似底部的“请求”方法的显式回调)。因此,这似乎不可能。在收集以前的项目之前,该操作员不应该通过任何事件。

是否有现成的操作符,我错过了,或者有一个简单的方法来实现这样的东西与现有的API?

EN

回答 3

Stack Overflow用户

回答已采纳

发布于 2020-02-08 17:42:01

我们可以使用一个由会合通道支持的流来构建它。

当容量为0时,它会创建RendezvousChannel。这个通道根本没有任何缓冲区。只有在发送和接收调用及时(会合)时,元素才会从发送方转移到接收方,因此发送挂起直到另一个协同器调用接收和接收挂起,直到另一个协同调用为止。

一个交会通道没有缓冲区。因此,需要挂起该通道的使用者并等待下一个元素,以便将一个元素发送到该通道。我们可以利用这种质量来删除不使用Channel.offer的通道挂起而不能接受的值,这是一个正常的非挂起函数。

Channel.offer

在不违反容量限制并返回true的情况下,立即将元素添加到此队列中。否则,它将立即返回false或如果通道isClosedForSend抛出异常(有关详细信息,请参阅“关闭”)。

因为channelFlow是缓冲的,所以我们需要将Flow<T>.buffer下游应用到0。

代码语言:javascript
复制
/**
 * Consume this [Flow] using a channelFlow with no buffer. Elements emitted from [this] flow
 * are offered to the underlying [channelFlow]. If the consumer is not currently suspended and 
 * waiting for the next element, the element is dropped. 
 * 
 * @return a flow that only emits elements when the downstream [Flow.collect] is waiting for the next element
 */
fun <T> Flow<T>.drop(): Flow<T> = channelFlow {
    collect { offer(it) }
}.buffer(capacity = 0)

下面是一个慢用户如何使用它删除元素的示例。

代码语言:javascript
复制
fun main() = runBlocking {
    flow {
        (0..100).forEach {
            emit(it)
            delay(100)
        }
    }.drop().collect {
        delay(1000)
        println(it)
    }
}

具有相应的输出:

代码语言:javascript
复制
0
11
21
31
41
51
61
71
81
91
票数 3
EN

Stack Overflow用户

发布于 2020-01-26 11:28:20

有什么简单的方法来实现这样的东西吗?

取决于你的直率。我就是这样做的。

背压转化为计划暂停和恢复在合作世界。对于onBackpressureDrop,下游必须表明它已经为某一项做好准备,并为其挂起,而上游不应该等待下游准备就绪。

您必须以无限制的方式消耗上游,并将项目和终端事件移交给下游,等待这些信号。

代码语言:javascript
复制
package hu.akarnokd.kotlin.flow.impl

import hu.akarnokd.kotlin.flow.Resumable
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.AbstractFlow
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.FlowCollector
import kotlinx.coroutines.flow.collect
import java.util.concurrent.atomic.AtomicBoolean
import java.util.concurrent.atomic.AtomicReference

@FlowPreview
internal class FlowOnBackpressureDrop<T>(private val source: Flow<T>)
 : AbstractFlow<T>() {
    @ExperimentalCoroutinesApi
    @InternalCoroutinesApi
    override suspend fun collectSafely(collector: FlowCollector<T>) {
        coroutineScope {
            val consumerReady = AtomicBoolean()
            val producerReady = Resumable()
            val value = AtomicReference<T>()
            val done = AtomicBoolean()
            val error = AtomicReference<Throwable>();

            launch {
                try {
                    source.collect {
                        if (consumerReady.get()) {
                            value.set(it);
                            consumerReady.set(false);
                            producerReady.resume();
                        }
                    }
                    done.set(true)
                } catch (ex: Throwable) {
                    error.set(ex)
                }
                producerReady.resume()
            }

            while (true) {
                consumerReady.set(true)
                producerReady.await()

                val d = done.get()
                val ex = error.get()
                val v = value.getAndSet(null)

                if (ex != null) {
                    throw ex;
                }
                if (d) {
                    break;
                }

                collector.emit(v)
            }
        }
    }
}

注:可振实现。

所以让我们来看一下实现。

首先,需要5个变量才能在上游的收集器和为下游工作的收集器之间传递信息:- consumerReady表示下游已为下一项做好准备,- producerReady表示生产者已存储下一项(或终端信号),而下游可以恢复- value上游已准备消费的项目- done上游已结束- error上游已失败。

接下来,我们必须启动上游的收集器,因为collector正在挂起,并且在完成之前不会让下游的使用者循环运行。在这个收集器中,我们检查下游消费者是否准备好了(通过consumerReady),如果是的话,存储当前项目,清除就绪标志,并通过producerReady通知其可用性。清除consumerReady将防止后续的上游项目被存储,直到下游本身表明了新的准备状态。

当上游结束或崩溃时,我们设置doneerror变量,并指示生产者已经发言。

launch { }部分之后,我们将继续代表下游收集器使用共享变量。

每一轮的第一件事是指示我们准备好了下一个值,然后等待生产者端信号,它已经将下一个事件放置在共享变量中。

接下来,我们从这些变量中收集值。我们渴望完成或抛出一个错误,并且只是作为最后的手段,向下游收集器重新发射上游项目。

票数 3
EN

Stack Overflow用户

发布于 2020-02-08 08:24:04

从安东斯潘( Anton )发表的这里评论中,有一种方法可以通过使用channelFlow来模仿下降。

但问题是,默认情况下,channelFlow构建器使用BUFFER策略,不允许将容量参数化。

在ChannelFlowBuilder中有一种将容量参数化的方法,但问题是API是内部的,ChannelFlowBuilder是私有的。

但本质上,如果复制粘贴ChannelFlowBuilder实现并创建如下类:

代码语言:javascript
复制
class BackPressureDropFlow<T>(private val source: Flow<T>) : AbstractFlow<T>() {

    @InternalCoroutinesApi
    override suspend fun collectSafely(collector: FlowCollector<T>) {
        ChannelFlowBuilder<T>({ source.collect { offer(it) } }, capacity = 0)
            .collect { collector.emit(it) }
    }
}

(或直接应用类似于转换的解决方案)。

那它看起来很管用。

这里的关键是使用capacity = 0,它表示在接收到的每个项目上(因为没有缓冲容量)下游将被挂起。

票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/59910643

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档