例如,我有3个来自不同端点的数据源。我想并行地呼叫所有这些呼叫,并获得第一个应答(最快),然后丢弃其他呼叫。我知道如何在Observable.amb()中使用RxJava。如何使用Kotlin协程实现它?重要的是-不需要在第一个结果之后等待其他调用。
suspend fun dataSourceOne(){
delay(1_000L)
}
suspend fun dataSourceTwo(){
delay(2_000L)
}
suspend fun dataSourceThree(){
delay(3_000L)
}
// should call [dataSourceOne(), dataSourceTwo(), dataSourceThree()] in parallel
// and discard [dataSourceTwo(), dataSourceThree()] after the getting a result from dataSourceOne()PS: Android应用程序。
发布于 2021-10-11 09:22:11
您可以启动收集器,使用原子整数来索引获胜者,并跟踪要取消的所有其他作业。例如:
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.FlowCollector
import kotlinx.coroutines.flow.collect
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.atomic.AtomicInteger
@FlowPreview
class FlowAmbIterable<T>(private val sources: Iterable<Flow<T>>) : Flow<T> {
@InternalCoroutinesApi
override suspend fun collect(collector: FlowCollector<T>) {
val winner = AtomicInteger()
val jobs = ConcurrentHashMap<Job, Int>()
coroutineScope {
var i = 1
for (source in sources) {
val idx = i
val job = launch {
source.collect {
val w = winner.get()
if (w == idx) {
collector.emit(it)
} else if (w == 0 && winner.compareAndSet(0, idx)) {
for (j in jobs.entries) {
if (j.value != idx) {
j.key.cancel()
}
}
collector.emit(it)
} else {
throw CancellationException()
}
}
}
jobs[job] = i
val w = winner.get()
if (w != 0 && w != i) {
job.cancel()
break
}
i++
}
}
}
}https://stackoverflow.com/questions/69503559
复制相似问题