首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >停止无限流量

停止无限流量
EN

Stack Overflow用户
提问于 2020-08-26 22:45:18
回答 1查看 547关注 0票数 0

我有一个在两个(不同)客户端之间中继的服务器。当用户(第一客户端,通过websockets)发送消息时,服务器需要每X毫秒向设备(第二客户端)重复该消息,直到接收到新消息,或者关闭websocket。

我将websocket作为流使用,并创建了以下操作符:

代码语言:javascript
复制
fun <T> flowEvery(value: T, everMilliSeconds: Long): Flow<T> =
    flow {
        while (true) {
            emit(value)
            delay(everMilliSeconds)
        }
    }.cancellable()

@ExperimentalCoroutinesApi
fun <T> Flow<T>.repeatEvery(mSec: Long): Flow<T> =
    this.flatMapLatest {
        flowEvery(it, mSec)
    }

问题是,一旦套接字关闭,最后一条消息就会一直发送下去。

我的调用点是:

代码语言:javascript
复制
try {
    oscConnections.sendTo(
        deviceIdentifier,
        incoming.consumeAsFlow().repeatEvery(50).mapNotNull { frame ->
            when (frame) {
                is Frame.Text -> listOf(frame.readText().toFloat())
                else -> null
            }
        })
} finally {
    close(CloseReason(CloseReason.Codes.NORMAL, "Ended"))
}

incoming通道已关闭(调用onCompletion),但发送到sendTo的流未关闭。UDP它自己使用输入流,并为它使用的每个元素发送sendTo消息。

如何强制流停止?

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2020-08-27 02:25:17

通过使用flatMapLatesttransformLatest,您可以将上游流的最后一个值替换为永不结束的流。

您必须以某种方式停止该流,并在协程中到处使用CancellationException,以发出取消协程的信号。您可以将永不结束的流逻辑包装在coroutineScope中,以便在上游流完成后仅精确取消该作用域。

代码语言:javascript
复制
fun <T> Flow<T>.repeatEvery(delay: Long): Flow<T> =
    flow<T> {
        try {
            coroutineScope {
                onCompletion { this@coroutineScope.cancel() }
                    .transformLatest { value ->
                        while (true) {
                            emit(value)
                            delay(delay)
                        }
                    }
                    .collect(::emit)
            }
        }
        catch (e: CancellationException) {
            // done
        }
    }

PS:在您的示例中,.cancellable()并没有做太多事情。根据文档,使用flow { … }等流构建器构建的流可以自动取消。

票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/63600153

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档