首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >用Kotlin-coroutines实现Observale.amb?

用Kotlin-coroutines实现Observale.amb?
EN

Stack Overflow用户
提问于 2021-10-09 03:38:09
回答 1查看 67关注 0票数 1

例如,我有3个来自不同端点的数据源。我想并行地呼叫所有这些呼叫,并获得第一个应答(最快),然后丢弃其他呼叫。我知道如何在Observable.amb()中使用RxJava。如何使用Kotlin协程实现它?重要的是-不需要在第一个结果之后等待其他调用。

代码语言:javascript
复制
suspend fun dataSourceOne(){
   delay(1_000L)
}

suspend fun dataSourceTwo(){
   delay(2_000L)
}

suspend fun dataSourceThree(){
   delay(3_000L)
}

// should call [dataSourceOne(), dataSourceTwo(), dataSourceThree()] in parallel 
// and discard [dataSourceTwo(), dataSourceThree()] after the getting a result from dataSourceOne()

PS: Android应用程序。

EN

回答 1

Stack Overflow用户

发布于 2021-10-11 09:22:11

您可以启动收集器,使用原子整数来索引获胜者,并跟踪要取消的所有其他作业。例如:

代码语言:javascript
复制
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.FlowCollector
import kotlinx.coroutines.flow.collect
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.atomic.AtomicInteger

@FlowPreview
class FlowAmbIterable<T>(private val sources: Iterable<Flow<T>>) : Flow<T> {
    @InternalCoroutinesApi
    override suspend fun collect(collector: FlowCollector<T>) {
        val winner = AtomicInteger()
        val jobs = ConcurrentHashMap<Job, Int>()
        coroutineScope {
            var i = 1
            for (source in sources) {
                val idx = i
                val job = launch {
                    source.collect {
                        val w = winner.get()
                        if (w == idx) {
                            collector.emit(it)
                        } else if (w == 0 && winner.compareAndSet(0, idx)) {
                            for (j in jobs.entries) {
                                if (j.value != idx) {
                                    j.key.cancel()
                                }
                            }

                            collector.emit(it)
                        } else {
                            throw CancellationException()
                        }
                    }
                }

                jobs[job] = i
                val w = winner.get()
                if (w != 0 && w != i) {
                    job.cancel()
                    break
                }

                i++
            }
        }
    }
}
票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/69503559

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档