我们使用夸克来处理这个运行在常规函数上的消息,因为我们基本上必须调用一个suspend函数。
fun process (msg:Message):Message{
val resFrom:Data = runBlocking{fetchDataFromDB(msg.key)}
val processedMsg = processRoutingKey(msg,resFrom)
return processedMsg
}我们希望以Uni (https://smallrye.io/smallrye-mutiny/getting-started/creating-unis)的形式获取数据,所以基本上我们想要返回
fun process (msg:Message){
val resFrom:Uni<Data> = ConvertUni {fetchDataFromDB(msg.key)}
}我们需要uni在下游一次处理一些数据,但是我们希望从方法返回一个Uni。
fun process (msg:Message):Uni<Message>{
val resFrom:Uni<Data> = ConvertUni {fetchDataFromDB(msg.key)}
val processed:Uni<Message> =process(msg,resfrom)
return processed
}发布于 2022-02-04 14:15:38
签名fun process(msg:Message): Uni<Message>意味着需要启动一些异步机制,并且会超过方法调用。这就像返回一个Future或Deferred一样。函数将立即返回,但基础处理尚未完成。
在协同世界中,这意味着你需要启动一个协同线。但是,与任何异步机制一样,它要求您注意它将在哪里运行,以及运行多长时间。这是由您用来启动协同线的CoroutineScope定义的,这就是为什么像async这样的协同构建器需要这样的范围。
因此,如果您希望函数启动一个比函数调用更长的协同线,则需要将一个CoroutineScope传递给函数:
fun CoroutineScope.process(msg:Message): Uni<Message> {
val uniResult = async { fetchDataFromDB(msg.key) }.asUni()
return process(msg, uniResult)
}在这里,Deferred<T>.asUni()由库兵变-科特林提供。在文档中给出的示例中,它们使用GlobalScope而不是要求调用方传递协同范围。这通常是一个不好的做法,因为它意味着你不能控制启动协同线的生命周期,如果你不小心的话,你可能会泄露一些东西。
接受CoroutineScope作为接收方意味着该方法的调用方可以选择该协同线的作用域,这将在适当时自动取消coroutine,并且还将定义运行coroutine的线程池/事件循环。
现在,考虑到这一点,您将看到您将在相同的API级别上使用coroutines和Uni的混合,这并不好。我建议您始终坚持挂起函数,直到您真的必须转换为Uni为止。
https://stackoverflow.com/questions/70986753
复制相似问题