我有一个在两个(不同)客户端之间中继的服务器。当用户(第一客户端,通过websockets)发送消息时,服务器需要每X毫秒向设备(第二客户端)重复该消息,直到接收到新消息,或者关闭websocket。
我将websocket作为流使用,并创建了以下操作符:
fun <T> flowEvery(value: T, everMilliSeconds: Long): Flow<T> =
flow {
while (true) {
emit(value)
delay(everMilliSeconds)
}
}.cancellable()
@ExperimentalCoroutinesApi
fun <T> Flow<T>.repeatEvery(mSec: Long): Flow<T> =
this.flatMapLatest {
flowEvery(it, mSec)
}问题是,一旦套接字关闭,最后一条消息就会一直发送下去。
我的调用点是:
try {
oscConnections.sendTo(
deviceIdentifier,
incoming.consumeAsFlow().repeatEvery(50).mapNotNull { frame ->
when (frame) {
is Frame.Text -> listOf(frame.readText().toFloat())
else -> null
}
})
} finally {
close(CloseReason(CloseReason.Codes.NORMAL, "Ended"))
}incoming通道已关闭(调用onCompletion),但发送到sendTo的流未关闭。UDP它自己使用输入流,并为它使用的每个元素发送sendTo消息。
如何强制流停止?
发布于 2020-08-27 02:25:17
通过使用flatMapLatest或transformLatest,您可以将上游流的最后一个值替换为永不结束的流。
您必须以某种方式停止该流,并在协程中到处使用CancellationException,以发出取消协程的信号。您可以将永不结束的流逻辑包装在coroutineScope中,以便在上游流完成后仅精确取消该作用域。
fun <T> Flow<T>.repeatEvery(delay: Long): Flow<T> =
flow<T> {
try {
coroutineScope {
onCompletion { this@coroutineScope.cancel() }
.transformLatest { value ->
while (true) {
emit(value)
delay(delay)
}
}
.collect(::emit)
}
}
catch (e: CancellationException) {
// done
}
}PS:在您的示例中,.cancellable()并没有做太多事情。根据文档,使用flow { … }等流构建器构建的流可以自动取消。
https://stackoverflow.com/questions/63600153
复制相似问题