首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >在项目反应堆中,对订阅用户论点的调用是连续的吗?

在项目反应堆中,对订阅用户论点的调用是连续的吗?
EN

Stack Overflow用户
提问于 2019-02-25 02:20:13
回答 1查看 388关注 0票数 0

使用以下代码:

代码语言:javascript
复制
flux.subscribe(consumer)

consumer的调用可能发生在不同的线程上,这取决于flux的构造方式(例如,使用subscribeOnpublishOn)。是否有保证,即使对consumer的调用可能发生在不同的线程上,调用是连续的,也就是说,每个调用都在下一个调用开始之前完成?

下面是一个更具体的例子(使用反应堆-Kafka):

代码语言:javascript
复制
val resultFlux: Flux<Pair<TopicPartition, Long>> = KafkaReceiver
    .create<K, V>(receiverOptions)
    .receive()
    .groupBy { m -> m.receiverOffset().topicPartition() }
    .flatMap { partitionFlux ->
        val parallelRoFlux = partitionFlux
                .publishOn(scheduler)
                .flatMapSequential(::processRecord, parallelism)
        parallelRoFlux.map { ro ->
            acknowledge(ro)
            Pair(ro.topicPartition(), ro.offset())
        }
    }

resultFlux.doOnNext { Thread.sleep(2000); log.info("doOnNext: $it") }
        .subscribe { Thread.sleep(1000); log.info("subscribe: $it") }

生成以下输出片段:

代码语言:javascript
复制
13:44:26.401 [elastic-6] INFO  consumerSvcFlow - Message_5>>>processed
13:44:28.402 [elastic-6] INFO  consumerExecutable - doOnNext: (demo-topic-0, 15)
13:44:29.402 [elastic-6] INFO  consumerExecutable - subscribe: (demo-topic-0, 15)
13:44:29.435 [elastic-8] INFO  consumerSvcFlow - Message_8>>>processed
13:44:31.435 [elastic-8] INFO  consumerExecutable - doOnNext: (demo-topic-0, 16)
13:44:32.436 [elastic-8] INFO  consumerExecutable - subscribe: (demo-topic-0, 16)
13:44:32.461 [elastic-6] INFO  consumerSvcFlow - Message_9>>>processed
13:44:34.462 [elastic-6] INFO  consumerExecutable - doOnNext: (demo-topic-0, 17)
13:44:35.462 [elastic-6] INFO  consumerExecutable - subscribe: (demo-topic-0, 17)
13:44:35.494 [elastic-8] INFO  consumerSvcFlow - Message_15>>>processed
13:44:37.494 [elastic-8] INFO  consumerExecutable - doOnNext: (demo-topic-0, 18)
13:44:38.495 [elastic-8] INFO  consumerExecutable - subscribe: (demo-topic-0, 18)
13:44:38.497 [elastic-6] INFO  consumerSvcFlow - Message_18>>>processed
13:44:40.498 [elastic-6] INFO  consumerExecutable - doOnNext: (demo-topic-0, 19)
13:44:41.499 [elastic-6] INFO  consumerExecutable - subscribe: (demo-topic-0, 19)
13:44:41.539 [elastic-8] INFO  consumerSvcFlow - Message_19>>>processed
13:44:43.540 [elastic-8] INFO  consumerExecutable - doOnNext: (demo-topic-0, 20)
13:44:44.540 [elastic-8] INFO  consumerExecutable - subscribe: (demo-topic-0, 20)

subscribe使用者参数的调用是顺序的,但有些调用是对线程弹性-6调用,有些调用是对线程弹性-8调用。

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2019-02-26 08:22:01

是的,根据反应流规范,有这样的保证。

首先,调用可能发生在不同于调用subscribe()的线程上。但是所有的消费者调用都发生在同一个线程上。

其次,subscribe(Consumer<T>)方法中的值使用者实际上被认为是Subscriber中的一个onNext信号,因此规范强制这样的调用是针对彼此以及onCompleteonError信号进行序列化的。

编辑:现在您已经添加了一些代码段,其中包含两个线程的事实来自于publishOnflatMap中完成的操作。因此,groupBy的每个组都可以选择Scheduler的不同Worker (如果有很多)。因此,在这些内部序列中完成的处理可以并行执行。然而,结果在被flatMap合并时是序列化的=>,subscribe(Consumer)是顺序的。

票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/54858647

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档