首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >Reactor GroupedFlux -等待完成

Reactor GroupedFlux -等待完成
EN

Stack Overflow用户
提问于 2018-02-02 21:27:46
回答 2查看 10.5K关注 0票数 6

有一个像下面这样的异步发布程序,有没有办法让Project Reactor等待整个流完成处理?

当然,不需要为未知的持续时间添加睡眠...

代码语言:javascript
复制
@Test
public void groupByPublishOn() throws InterruptedException {
    UnicastProcessor<Integer> processor = UnicastProcessor.create();

    List<Integer> results = new ArrayList<>();
    Flux<Flux<Integer>> groupPublisher = processor.publish(1)
                                                  .autoConnect()
                                                  .groupBy(i -> i % 2)
                                                  .map(group -> group.publishOn(Schedulers.parallel()));

    groupPublisher.log()
                  .subscribe(g -> g.log()
                                   .subscribe(results::add));

    List<Integer> input = Arrays.asList(1, 3, 5, 2, 4, 6, 11, 12, 13);
    input.forEach(processor::onNext);
    processor.onComplete();

    Thread.sleep(500);

    Assert.assertTrue(results.size() == input.size());
}
EN

回答 2

Stack Overflow用户

回答已采纳

发布于 2018-02-03 21:42:49

您可以替换以下行:

代码语言:javascript
复制
 groupPublisher.log()
                  .subscribe(g -> g.log()
                                   .subscribe(results::add));

有了这个

代码语言:javascript
复制
groupPublisher.log()
              .flatMap(g -> g.log()
                             .doOnNext(results::add)
              )
              .blockLast();

flatMap是一种比订阅中订阅更好的模式,它将为您订阅群组。

doOnNext会处理消耗的副作用(向集合添加值),使您不必在订阅中执行该操作。

blockLast()替换了订阅,而不是让您为它阻塞的事件提供处理程序,直到完成(并返回最后发出的项,但您应该已经在doOnNext中处理了这一点)。

票数 6
EN

Stack Overflow用户

发布于 2018-02-04 18:19:46

使用blockLast()的主要问题是,如果您的操作无法完成,您将永远不会释放您的管道。

你需要做的就是获取Disposable并检查是否已经完成了流水线,这意味着boolean isDisposed将返回true。

然后,您可以决定是否需要超时,就像惰性计数实现一样:)

int count = 0;

代码语言:javascript
复制
@Test
public void checkIfItDisposable() throws InterruptedException {
    Disposable subscribe = Flux.just(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
            .map(number -> {
                try {
                    Thread.sleep(500);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                return number;
            }).subscribeOn(Schedulers.newElastic("1"))
            .subscribe();

    while (!subscribe.isDisposed() && count < 100) {
        Thread.sleep(400);
        count++;
        System.out.println("Waiting......");
    }
    System.out.println("It disposable:" + subscribe.isDisposed());

如果您想要使用blockLast,至少要添加一个超时

代码语言:javascript
复制
@Test
public void checkIfItDisposableBlocking() throws InterruptedException {
    Flux.just(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
            .map(number -> {
                try {
                    Thread.sleep(500);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                return number;
            }).subscribeOn(Schedulers.newElastic("1"))
            .blockLast(Duration.of(60, ChronoUnit.SECONDS));
    System.out.println("It disposable");
}

如果您需要更多ides https://github.com/politrons/reactive,可以在此处查看更多反应器示例

票数 2
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/48583716

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档