首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >带项目反应器的反应流的递归

带项目反应器的反应流的递归
EN

Stack Overflow用户
提问于 2017-12-16 23:24:51
回答 1查看 3.6K关注 0票数 6

我的目标是使用reactive streams和Project Reactor遍历目录图并记录它们的所有名称。

因为文件系统是远程的,所以对它的调用是阻塞的。因此,我希望将阻塞调用的执行与非阻塞异步代码的其余部分分开。我使用的是这个建议:http://projectreactor.io/docs/core/release/reference/#faq.wrap-blocking

下面是我需要遍历的结构:

代码语言:javascript
复制
/
  /jupiter
    /phase-1
      /sub-phase-1
      /sub-phase-2
      /sub-phase-3
    /phase-2
    /phase-3
    /phase-4

  /earth
    /phase-1
      /sub-phase-1
      /sub-phase-2
      /sub-phase-3
    /phase-2
    /phase-3
    /phase-4

  /mars
    /phase-1
      /sub-phase-1
      /sub-phase-2
      /sub-phase-3
    /phase-2
    /phase-3
    /phase-4

这是我到目前为止想出的代码:

代码语言:javascript
复制
public class ReactorEngine {

    private static Logger log = LoggerFactory.getLogger(ReactorEngine.class);

    public static void main(String[] args) throws InterruptedException {
        CountDownLatch latch = new CountDownLatch(1);

        Server server = new Server();

        Flux.fromIterable(server.getChildren("/"))
            .flatMap(parent -> Mono.fromCallable(() -> server.getChildren(parent)).subscribeOn(Schedulers.elastic()))
            .publishOn(Schedulers.elastic())
            .doOnTerminate(latch::countDown)
            .subscribe(ReactorEngine::handleResponse);

        latch.await();
    }

    private static void handleResponse(List<String> value) {
        log.info("Received: " + value);
    }

}

public class Server {

    public List<String> getChildren(final String path) {
        // Generate some I/O
        ...
    }
}

因此,我从顶层目录开始,并异步请求向下的第一层目录(它们的子目录)。一切都很顺利,下面是输出:

代码语言:javascript
复制
15:35:05.902 [main] INFO org.playground.async.mock.Server - Requesting children of: /...
15:35:07.062 [main] DEBUG reactor.util.Loggers$LoggerFactory - Using Slf4j logging framework
15:35:07.140 [elastic-3] INFO org.playground.async.mock.Server - Requesting children of: /jupiter/...
15:35:07.140 [elastic-4] INFO org.playground.async.mock.Server - Requesting children of: /earth/...
15:35:07.140 [elastic-5] INFO org.playground.async.mock.Server - Requesting children of: /mars/...
15:35:08.140 [elastic-2] INFO org.playground.async.engine.ReactorEngine - Received: [/earth/phase-1/, /earth/phase-2/, /earth/phase-3/]
15:35:08.141 [elastic-2] INFO org.playground.async.engine.ReactorEngine - Received: [/jupiter/phase-1/, /jupiter/phase-2/, /jupiter/phase-3/]
15:35:08.141 [elastic-2] INFO org.playground.async.engine.ReactorEngine - Received: [/mars/phase-1/, /mars/phase-2/, /mars/phase-3/]

现在我的问题是,如何将作为结果返回的元素放回flux中,以便引擎将递归地调用server.getChildren(父级),直到遍历完整个目录图?

实际上递归是可行的吗,还是有一种更好的“反应式”方法来实现这一点,比如通过操作符?

谢谢!

编辑

西蒙建议的expand(Function)操作符可以很好地遍历图形。我已经将代码更改为:

代码语言:javascript
复制
public static void main(String[] args) throws InterruptedException {
    CountDownLatch latch = new CountDownLatch(1);

    Server server = new Server();

    Flux.fromIterable(server.getChildren("/"))
        .expand(p -> Flux.fromIterable(server.getChildren(p)).subscribeOn(Schedulers.elastic()))
        .publishOn(Schedulers.elastic())
        .doOnTerminate(latch::countDown)
        .subscribe(ReactorEngine::handleResponse);

    latch.await();
}

但是,我失去了异步调用服务器的阻塞server.getChildren(String)方法的方式。正如您在这些日志中看到的,每个子目录都是同步获取的,每秒一次:

代码语言:javascript
复制
15:57:55.398 [main] INFO org.playground.async.mock.Server - Requesting children of: /...
15:57:56.558 [main] DEBUG reactor.util.Loggers$LoggerFactory - Using Slf4j logging framework
15:57:56.593 [main] INFO org.playground.async.mock.Server - Requesting children of: /jupiter/...
15:57:56.593 [elastic-2] INFO org.playground.async.engine.ReactorEngine - Received: /jupiter/
15:57:57.594 [main] INFO org.playground.async.mock.Server - Requesting children of: /earth/...
15:57:57.594 [elastic-2] INFO org.playground.async.engine.ReactorEngine - Received: /earth/
15:57:58.594 [main] INFO org.playground.async.mock.Server - Requesting children of: /mars/...
15:57:58.594 [elastic-2] INFO org.playground.async.engine.ReactorEngine - Received: /mars/
15:57:59.599 [elastic-3] INFO org.playground.async.mock.Server - Requesting children of: /jupiter/phase-1/...
15:57:59.599 [elastic-2] INFO org.playground.async.engine.ReactorEngine - Received: /jupiter/phase-1/
15:58:00.600 [elastic-3] INFO org.playground.async.mock.Server - Requesting children of: /jupiter/phase-2/...
15:58:00.600 [elastic-2] INFO org.playground.async.engine.ReactorEngine - Received: /jupiter/phase-2/
15:58:01.600 [elastic-3] INFO org.playground.async.mock.Server - Requesting children of: /jupiter/phase-3/...
15:58:01.600 [elastic-2] INFO org.playground.async.engine.ReactorEngine - Received: /jupiter/phase-3/
15:58:02.601 [elastic-3] INFO org.playground.async.mock.Server - Requesting children of: /jupiter/phase-4/...
15:58:02.601 [elastic-2] INFO org.playground.async.engine.ReactorEngine - Received: /jupiter/phase-4/
15:58:03.602 [elastic-4] INFO org.playground.async.mock.Server - Requesting children of: /earth/phase-1/...
15:58:03.603 [elastic-2] INFO org.playground.async.engine.ReactorEngine - Received: /earth/phase-1/
15:58:04.604 [elastic-4] INFO org.playground.async.mock.Server - Requesting children of: /earth/phase-2/...
15:58:04.604 [elastic-2] INFO org.playground.async.engine.ReactorEngine - Received: /earth/phase-2/
15:58:05.604 [elastic-4] INFO org.playground.async.mock.Server - Requesting children of: /earth/phase-3/...
15:58:05.604 [elastic-2] INFO org.playground.async.engine.ReactorEngine - Received: /earth/phase-3/
15:58:06.605 [elastic-4] INFO org.playground.async.mock.Server - Requesting children of: /earth/phase-4/...
15:58:06.605 [elastic-2] INFO org.playground.async.engine.ReactorEngine - Received: /earth/phase-4/
15:58:07.605 [elastic-3] INFO org.playground.async.mock.Server - Requesting children of: /mars/phase-1/...
15:58:07.605 [elastic-2] INFO org.playground.async.engine.ReactorEngine - Received: /mars/phase-1/
15:58:08.606 [elastic-3] INFO org.playground.async.mock.Server - Requesting children of: /mars/phase-2/...
15:58:08.606 [elastic-2] INFO org.playground.async.engine.ReactorEngine - Received: /mars/phase-2/
15:58:09.607 [elastic-3] INFO org.playground.async.mock.Server - Requesting children of: /mars/phase-3/...
15:58:09.607 [elastic-2] INFO org.playground.async.engine.ReactorEngine - Received: /mars/phase-3/
15:58:10.608 [elastic-3] INFO org.playground.async.mock.Server - Requesting children of: /mars/phase-4/...
15:58:10.608 [elastic-2] INFO org.playground.async.engine.ReactorEngine - Received: /mars/phase-4/

您能否提供一个提示,说明如何将对Mono.fromCallable(() -> server.getChildren(parent)).subscribeOn(Schedulers.elastic())的调用重新纳入方案中?没有我可以调用的Flux.fromCallable(),也许这是一个很好的理由。

但是,由于我对反应式编程和Project Reactor的概念非常陌生,因此很难理解这种异步方式。

谢谢。

EN

回答 1

Stack Overflow用户

发布于 2017-12-18 18:01:08

这里有一个运算符:)看看expandexpandDeep

票数 10
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/47847132

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档