我有一个类,它使用actor来确保共享可变状态的线程安全。为了便于使用,我在这个actor上做了一个小包装:
interface Ref<T : Any> {
fun get(): T
fun transform(transformer: (T) -> T): Job
}这里,get使用runBlocking来阻塞,直到它获取T的实际值为止。
override fun get(): T = runBlocking {
val deferred = CompletableDeferred<T>()
launch {
actor.send(RefOperation.Get(deferred))
}
deferred.await()
}transform在没有runBlocking的情况下做类似的事情,只返回一个Job
override fun transform(transformer: (T) -> T): Job {
val job = Job()
launch {
actor.send(RefOperation.Transform(transformer, job))
}
return job
}在transform调用导致另一个调用之前,这是很好的:
ref.transform {
...
ref.transform {
}
}这里我有两个Job,但是如果我想要等待它们的完成,就无法将它们组合成一个Job,在它上我可以调用join()。
这方面的解决方案是结构化并发性,但是我不知道如何再创建我的actor,因为它被定义为CoroutineScope上的一个扩展。
如何在保持使用结构化并发的能力的同时继续使用actor?
注意,我已经创建了,因为我的项目是多平台的,并且对于除了JVM之外的目标,我使用替代的实现。
发布于 2020-01-19 03:29:05
actor处理项目的顺序与添加的顺序相同,并按顺序在单个协同线中执行。这意味着内部transform将在外部transform完成后进行处理,并且在使用actor时不能更改它(在actor中,我们不能启动更多的协同,因为我们将状态限制在单个线程上,否则就有可能重复处理顺序)。试图加入外部transform主体中的内部transform作业(如果我们将transform标记为挂起函数)只会导致死锁。
你对这种行为没意见吗?如果没有,则不要使用参与者或嵌套转换。如果是,请提供一些用例,在这些用例中,创建将在外部transform之后处理的嵌套transform是有意义的。
至于加入所有的工作,我有一些代码。在main中,我们有创建内部转换的外部转换。外部一个返回2,内部一个返回8,但是内部一个在外部一个完成后开始,所以结果是8。但正如您所希望的,transformJob.join() in main也在等待内部作业。
private sealed class RefOperation<T>
private class Get<T : Any>(val deferred: CompletableDeferred<T>) : RefOperation<T>()
private class Transform<T : Any>(val transformer: TransformStub<T>.(T) -> T, val stub: TransformStub<T>, val job: CompletableJob) : RefOperation<T>()
interface Ref<T : Any> {
fun get(): T
fun transform(transformer: TransformStub<T>.(T) -> T): Job
}
interface TransformStub<T : Any> {
fun transform(transformer: TransformStub<T>.(T) -> T): Job
}
private class TransformStubImpl<T : Any>(
val actor: SendChannel<RefOperation<T>>,
val scope: CoroutineScope
) : TransformStub<T> {
override fun transform(transformer: TransformStub<T>.(T) -> T): Job {
return scope.launch {
val childJob: CompletableJob = Job()
val childStub = TransformStubImpl(actor, this)
actor.send(Transform(transformer, childStub, childJob))
childJob.join()
}
}
}
class RefImpl<T : Any>(initialValue: T) : Ref<T> {
private val actorJob = Job()
private val actorScope = CoroutineScope(actorJob)
private val actor = actorScope.actor<RefOperation<T>> {
var value: T = initialValue
for (msg in channel) {
when (msg) {
is Get -> {
println("Get! $value")
msg.deferred.complete(value)
}
is Transform -> {
with(msg) {
val newValue = stub.transformer(value)
println("Transform! $value -> $newValue")
value = newValue
job.complete()
}
}
}
}
}
override fun get(): T = runBlocking {
val deferred = CompletableDeferred<T>()
actor.send(Get(deferred))
deferred.await()
}
override fun transform(transformer: TransformStub<T>.(T) -> T): Job {
val stub = TransformStubImpl(actor, GlobalScope)
return stub.transform(transformer)
}
}
fun main() = runBlocking<Unit> {
val ref: Ref<Int> = RefImpl(0)
val transformJob = ref.transform {
transform { 8 }
2
}
transformJob.join()
ref.get()
}https://stackoverflow.com/questions/59806101
复制相似问题