首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >如何在保留Kotlin中进行结构化并发的能力的同时使用参与者?

如何在保留Kotlin中进行结构化并发的能力的同时使用参与者?
EN

Stack Overflow用户
提问于 2020-01-19 00:25:07
回答 1查看 220关注 0票数 1

我有一个类,它使用actor来确保共享可变状态的线程安全。为了便于使用,我在这个actor上做了一个小包装:

代码语言:javascript
复制
interface Ref<T : Any> {

    fun get(): T

    fun transform(transformer: (T) -> T): Job

}

这里,get使用runBlocking来阻塞,直到它获取T的实际值为止。

代码语言:javascript
复制
override fun get(): T = runBlocking {
    val deferred = CompletableDeferred<T>()
    launch {
        actor.send(RefOperation.Get(deferred))
    }
    deferred.await()
}

transform在没有runBlocking的情况下做类似的事情,只返回一个Job

代码语言:javascript
复制
override fun transform(transformer: (T) -> T): Job {
    val job = Job()
    launch {
        actor.send(RefOperation.Transform(transformer, job))
    }
    return job
}

transform调用导致另一个调用之前,这是很好的:

代码语言:javascript
复制
ref.transform {

  ...
  ref.transform {

  }
}

这里我有两个Job,但是如果我想要等待它们的完成,就无法将它们组合成一个Job,在它上我可以调用join()

这方面的解决方案是结构化并发性,但是我不知道如何再创建我的actor,因为它被定义为CoroutineScope上的一个扩展。

如何在保持使用结构化并发的能力的同时继续使用actor

注意,我已经创建了,因为我的项目是多平台的,并且对于除了JVM之外的目标,我使用替代的实现。

EN

回答 1

Stack Overflow用户

发布于 2020-01-19 03:29:05

actor处理项目的顺序与添加的顺序相同,并按顺序在单个协同线中执行。这意味着内部transform将在外部transform完成后进行处理,并且在使用actor时不能更改它(在actor中,我们不能启动更多的协同,因为我们将状态限制在单个线程上,否则就有可能重复处理顺序)。试图加入外部transform主体中的内部transform作业(如果我们将transform标记为挂起函数)只会导致死锁。

你对这种行为没意见吗?如果没有,则不要使用参与者或嵌套转换。如果是,请提供一些用例,在这些用例中,创建将在外部transform之后处理的嵌套transform是有意义的。

至于加入所有的工作,我有一些代码。在main中,我们有创建内部转换的外部转换。外部一个返回2,内部一个返回8,但是内部一个在外部一个完成后开始,所以结果是8。但正如您所希望的,transformJob.join() in main也在等待内部作业。

代码语言:javascript
复制
private sealed class RefOperation<T>
private class Get<T : Any>(val deferred: CompletableDeferred<T>) : RefOperation<T>()
private class Transform<T : Any>(val transformer: TransformStub<T>.(T) -> T, val stub: TransformStub<T>, val job: CompletableJob) : RefOperation<T>()

interface Ref<T : Any> {

    fun get(): T

    fun transform(transformer: TransformStub<T>.(T) -> T): Job

}

interface TransformStub<T : Any> {
    fun transform(transformer: TransformStub<T>.(T) -> T): Job
}

private class TransformStubImpl<T : Any>(
        val actor: SendChannel<RefOperation<T>>,
        val scope: CoroutineScope
) : TransformStub<T> {

    override fun transform(transformer: TransformStub<T>.(T) -> T): Job {
        return scope.launch {
            val childJob: CompletableJob = Job()
            val childStub = TransformStubImpl(actor, this)
            actor.send(Transform(transformer, childStub, childJob))
            childJob.join()
        }
    }

}

class RefImpl<T : Any>(initialValue: T) : Ref<T> {

    private val actorJob = Job()
    private val actorScope = CoroutineScope(actorJob)
    private val actor = actorScope.actor<RefOperation<T>> {
        var value: T = initialValue
        for (msg in channel) {
            when (msg) {
                is Get -> {
                    println("Get! $value")
                    msg.deferred.complete(value)
                }
                is Transform -> {
                    with(msg) {
                        val newValue = stub.transformer(value)
                        println("Transform! $value -> $newValue")
                        value = newValue
                        job.complete()
                    }
                }
            }
        }
    }

    override fun get(): T = runBlocking {
        val deferred = CompletableDeferred<T>()
        actor.send(Get(deferred))
        deferred.await()
    }

    override fun transform(transformer: TransformStub<T>.(T) -> T): Job {
        val stub = TransformStubImpl(actor, GlobalScope)
        return stub.transform(transformer)
    }

}

fun main() = runBlocking<Unit> {
    val ref: Ref<Int> = RefImpl(0)
    val transformJob = ref.transform {
        transform { 8 }
        2
    }
    transformJob.join()
    ref.get()
}
票数 5
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/59806101

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档