首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >如何在Arrow +反应器内异步执行Monad理解

如何在Arrow +反应器内异步执行Monad理解
EN

Stack Overflow用户
提问于 2020-11-08 13:12:27
回答 1查看 156关注 0票数 0

在下面的代码中,每个helloX()方法都异步运行(它是一个在单独线程中运行的延迟Mono ),请参见下面的完整代码:

代码语言:javascript
复制
    override fun helloEverybody(): Kind<ForMonoK, String> {
        return MonoK.monad().fx.monad {
            val j = !helloJoey()
            val j2 = !helloJohn()
            val j3 = !helloMary()
            "$j and $j2 and $j3"
        }.fix()
    }

但是,在日志中,我看到它们是安全运行的:

代码语言:javascript
复制
14:10:46.983 [main] DEBUG reactor.util.Loggers$LoggerFactory - Using Slf4j logging framework
14:10:47.084 [elastic-2] INFO com.codependent.kotlinarrow.service.HelloServiceImpl - helloJoey()
14:10:49.087 [elastic-2] INFO com.codependent.kotlinarrow.service.HelloServiceImpl - helloJoey() - ready
14:10:49.090 [elastic-3] INFO com.codependent.kotlinarrow.service.HelloServiceImpl - helloJohn()
14:10:54.091 [elastic-3] INFO com.codependent.kotlinarrow.service.HelloServiceImpl - helloJohn() - ready
14:10:54.092 [elastic-2] INFO com.codependent.kotlinarrow.service.HelloServiceImpl - helloMary()
14:10:59.095 [elastic-2] INFO com.codependent.kotlinarrow.service.HelloServiceImpl - helloMary() - ready
hello Joey and hello John and hello Mary

一旦他们都完成了,我怎么能让他们并行地执行和汇总所有的结果?

带main方法()的完整代码:

代码语言:javascript
复制
class HelloServiceImpl : HelloService<ForMonoK> {

    private val logger = LoggerFactory.getLogger(javaClass)

    override fun helloEverybody(): Kind<ForMonoK, String> {
        return MonoK.monad().fx.monad {
            val j = !helloJoey()
            val j2 = !helloJohn()
            val j3 = !helloMary()
            "$j and $j2 and $j3"
        }.fix()
    }

    override fun helloJoey(): Kind<ForMonoK, String> {
        return Mono.defer {
            logger.info("helloJoey()")
            sleep(2000)
            logger.info("helloJoey() - ready")
            Mono.just("hello Joey")
        }.subscribeOn(Schedulers.elastic()).k()
    }

    override fun helloJohn(): Kind<ForMonoK, String> {
        return Mono.defer {
            logger.info("helloJohn()")
            sleep(5000)
            logger.info("helloJohn() - ready")
            Mono.just("hello John")
        }.subscribeOn(Schedulers.elastic()).k()
    }

    override fun helloMary(): Kind<ForMonoK, String> {
        return Mono.defer {
            logger.info("helloMary()")
            sleep(5000)
            logger.info("helloMary() - ready")
            Mono.just("hello Mary")
        }.subscribeOn(Schedulers.elastic()).k()
    }

}

fun main() {
    val countDownLatch = CountDownLatch(1)
    HelloServiceImpl().helloEverybody().fix().mono.subscribe {
        println(it)
        countDownLatch.countDown()
    }
    countDownLatch.await()
}

更新

我已经修改了将顺序操作与并行操作相结合的方法:

代码语言:javascript
复制
    override fun helloEverybody(): Kind<ForMonoK, String> {
        return MonoK.async().fx.async {
            val j = helloJoey().bind()
            val j2= Dispatchers.IO
                    .parMapN(helloJohn(), helloMary()){ it1, it2 -> "$it1 and $it2" }
            "$j and $j2"
        }
    }

不幸的是,parMapN不能与ForMonoK一起使用:

代码语言:javascript
复制
Type inference failed: fun <A, B, C, D> CoroutineContext.parMapN(fa: Kind<ForIO, A>, fb: Kind<ForIO, B>, fc: Kind<ForIO, C>, f: (A, B, C) -> D): IO<D>
cannot be applied to
receiver: CoroutineDispatcher  arguments: (Kind<ForMonoK, String>,Kind<ForMonoK, String>,Kind<ForMonoK, String>,(String, String, String) -> String)

想法?

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2020-11-08 14:45:10

flatMapmap一样,没有线程语义或并行性。您所追求的是parMapparTraverse,它们并行运行多个MonoK

这时,fx块就变得不必要了,因为它是为顺序操作设计的。你可以混合和匹配两者。

代码语言:javascript
复制
MonoK.async().fx.async {

  val result = 
    Dispatchers.IO
     .parMap(helloJoey(), helloMary()) { joe, mary -> ... }
     .bind()

  otherThing(result).bind()

}
票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/64738458

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档