在可流动的RxJava 2中,有不同的背压策略,其中最有趣的是:
在整个Rx链中都受到尊重。
在Kotlin有Flow,它宣称它有背压支持。通过使用以下方法,我能够创建具有缓冲区和最新策略的流:
缓冲器:
observeFlow()
.buffer(10)
.collect { ... }最新资料如下:
observeFlow()
.conflate()
.collect { ... }这只是同一个缓冲区操作符上的快捷方式。
但我没能找到和DROP一样的东西。简而言之,当之前的值尚未处理时,DROP将删除流中的任何值。有了流程,我甚至不确定这是可能的。
考虑到本案:
observeFlow()
.backpressureDrop() // non-existent operator, just for illustrative purposes
.map { ... }
.flatMapMerge { ... }
.collect { ... }因此,backpressureDrop应该尊重在流下面所做的任何工作,而操作员不知道下面发生了什么(没有RxJava订阅服务器中类似底部的“请求”方法的显式回调)。因此,这似乎不可能。在收集以前的项目之前,该操作员不应该通过任何事件。
是否有现成的操作符,我错过了,或者有一个简单的方法来实现这样的东西与现有的API?
发布于 2020-02-08 17:42:01
我们可以使用一个由会合通道支持的流来构建它。
当容量为0时,它会创建RendezvousChannel。这个通道根本没有任何缓冲区。只有在发送和接收调用及时(会合)时,元素才会从发送方转移到接收方,因此发送挂起直到另一个协同器调用接收和接收挂起,直到另一个协同调用为止。
一个交会通道没有缓冲区。因此,需要挂起该通道的使用者并等待下一个元素,以便将一个元素发送到该通道。我们可以利用这种质量来删除不使用Channel.offer的通道挂起而不能接受的值,这是一个正常的非挂起函数。
在不违反容量限制并返回true的情况下,立即将元素添加到此队列中。否则,它将立即返回false或如果通道isClosedForSend抛出异常(有关详细信息,请参阅“关闭”)。
因为channelFlow是缓冲的,所以我们需要将Flow<T>.buffer下游应用到0。
/**
* Consume this [Flow] using a channelFlow with no buffer. Elements emitted from [this] flow
* are offered to the underlying [channelFlow]. If the consumer is not currently suspended and
* waiting for the next element, the element is dropped.
*
* @return a flow that only emits elements when the downstream [Flow.collect] is waiting for the next element
*/
fun <T> Flow<T>.drop(): Flow<T> = channelFlow {
collect { offer(it) }
}.buffer(capacity = 0)下面是一个慢用户如何使用它删除元素的示例。
fun main() = runBlocking {
flow {
(0..100).forEach {
emit(it)
delay(100)
}
}.drop().collect {
delay(1000)
println(it)
}
}具有相应的输出:
0
11
21
31
41
51
61
71
81
91发布于 2020-01-26 11:28:20
有什么简单的方法来实现这样的东西吗?
取决于你的直率。我就是这样做的。
背压转化为计划暂停和恢复在合作世界。对于onBackpressureDrop,下游必须表明它已经为某一项做好准备,并为其挂起,而上游不应该等待下游准备就绪。
您必须以无限制的方式消耗上游,并将项目和终端事件移交给下游,等待这些信号。
package hu.akarnokd.kotlin.flow.impl
import hu.akarnokd.kotlin.flow.Resumable
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.AbstractFlow
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.FlowCollector
import kotlinx.coroutines.flow.collect
import java.util.concurrent.atomic.AtomicBoolean
import java.util.concurrent.atomic.AtomicReference
@FlowPreview
internal class FlowOnBackpressureDrop<T>(private val source: Flow<T>)
: AbstractFlow<T>() {
@ExperimentalCoroutinesApi
@InternalCoroutinesApi
override suspend fun collectSafely(collector: FlowCollector<T>) {
coroutineScope {
val consumerReady = AtomicBoolean()
val producerReady = Resumable()
val value = AtomicReference<T>()
val done = AtomicBoolean()
val error = AtomicReference<Throwable>();
launch {
try {
source.collect {
if (consumerReady.get()) {
value.set(it);
consumerReady.set(false);
producerReady.resume();
}
}
done.set(true)
} catch (ex: Throwable) {
error.set(ex)
}
producerReady.resume()
}
while (true) {
consumerReady.set(true)
producerReady.await()
val d = done.get()
val ex = error.get()
val v = value.getAndSet(null)
if (ex != null) {
throw ex;
}
if (d) {
break;
}
collector.emit(v)
}
}
}
}注:可振实现。
所以让我们来看一下实现。
首先,需要5个变量才能在上游的收集器和为下游工作的收集器之间传递信息:- consumerReady表示下游已为下一项做好准备,- producerReady表示生产者已存储下一项(或终端信号),而下游可以恢复- value上游已准备消费的项目- done上游已结束- error上游已失败。
接下来,我们必须启动上游的收集器,因为collector正在挂起,并且在完成之前不会让下游的使用者循环运行。在这个收集器中,我们检查下游消费者是否准备好了(通过consumerReady),如果是的话,存储当前项目,清除就绪标志,并通过producerReady通知其可用性。清除consumerReady将防止后续的上游项目被存储,直到下游本身表明了新的准备状态。
当上游结束或崩溃时,我们设置done或error变量,并指示生产者已经发言。
在launch { }部分之后,我们将继续代表下游收集器使用共享变量。
每一轮的第一件事是指示我们准备好了下一个值,然后等待生产者端信号,它已经将下一个事件放置在共享变量中。
接下来,我们从这些变量中收集值。我们渴望完成或抛出一个错误,并且只是作为最后的手段,向下游收集器重新发射上游项目。
发布于 2020-02-08 08:24:04
从安东斯潘( Anton )发表的这里评论中,有一种方法可以通过使用channelFlow来模仿下降。
但问题是,默认情况下,channelFlow构建器使用BUFFER策略,不允许将容量参数化。
在ChannelFlowBuilder中有一种将容量参数化的方法,但问题是API是内部的,ChannelFlowBuilder是私有的。
但本质上,如果复制粘贴ChannelFlowBuilder实现并创建如下类:
class BackPressureDropFlow<T>(private val source: Flow<T>) : AbstractFlow<T>() {
@InternalCoroutinesApi
override suspend fun collectSafely(collector: FlowCollector<T>) {
ChannelFlowBuilder<T>({ source.collect { offer(it) } }, capacity = 0)
.collect { collector.emit(it) }
}
}(或直接应用类似于转换的解决方案)。
那它看起来很管用。
这里的关键是使用capacity = 0,它表示在接收到的每个项目上(因为没有缓冲容量)下游将被挂起。
https://stackoverflow.com/questions/59910643
复制相似问题