我需要创建一个API,它应该是流,它收集事件。问题是,这些事件可能来自一个通道(我需要一个模拟的PublishSubject)和一个流(它执行一个网络请求)。
我也不确定这是否是最好的解决方案,所以让我知道我是否能使它更好。
我在做什么:
我的api:
override val statusFlow = trackStatus()
private fun trackStatus(): Flow<State> = flow { ... }
private val deviceChannel = Channel<State>(CONFLATED)因此,statusFlow应该返回一个流,我可以从其中接收流和通道的数据。
我试图通过consumeAsflow将通道转换为flow,但它不起作用。
我认为解决办法是
private fun trackStatus(): Flow<State> = flowOf(channel.toFlow(), flow).flattenMerge()正确的方法是什么?
发布于 2020-06-02 09:35:23
这种情况的解决方案是merge(),正如省答案中所注意到的那样,但是它不会那样工作。您应该使用BroadcastChannel而不是通道,因为最后一次订阅只能在一生中提供一次订阅。另外,您应该使用asFlow()将该通道转换为流。
发布于 2020-06-01 22:45:09
private fun trackStatus() = merge(deviceChannel.recieveAsFlow(), trackStatus)coroutines库中merge()的定义是
/**
* Merges the given flows into a single flow without preserving an order of elements.
* All flows are merged concurrently, without limit on the number of simultaneously collected flows.
*
* ### Operator fusion
*
* Applications of [flowOn], [buffer], [produceIn], and [broadcastIn] _after_ this operator are fused with
* its concurrent merging so that only one properly configured channel is used for execution of merging logic.
*/
@ExperimentalCoroutinesApi
public fun <T> merge(vararg flows: Flow<T>): Flow<T> = flows.asIterable().merge()https://stackoverflow.com/questions/62130107
复制相似问题