首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >Reactor:展开ParallelFlux

Reactor:展开ParallelFlux
EN

Stack Overflow用户
提问于 2019-02-04 18:14:45
回答 2查看 861关注 0票数 1

我有一个需要扩展的项目集合,所以我选择reactor是因为它的反应能力,因为扩展需要IO操作。

下面是一段工作代码:

代码语言:javascript
复制
public Flux<Item> expand(List<Item> unprocessedItems) {
  return Flux.fromIterable(unprocessedItems)
    .expandDeep(this::expandItem);
}

注意,this::expandItem是一个阻塞操作(多个数据库查询,一些计算,...)。现在我希望这个扩展是并行的,但据我所知,.expand().expandDeep()只是Flux类的成员,而不是ParallelFlux类的成员。我尝试在.expand()调用之前添加.publishOn().subscribeOn(),但没有成功。

这是我第一次使用reactor,但我没有看到任何阻止并行扩展的技术问题,有什么方法可以做到吗?是API遗漏了还是我遗漏了什么?

EN

回答 2

Stack Overflow用户

发布于 2019-02-04 22:05:55

是的,您是对的,ParallelFlux没有.expand().expandDeep()方法,但我可以使用其他方法,创建具有expand方法的附加Publisher并将其传递给您的ParallelFlux,如下所示:

代码语言:javascript
复制
public static void main(String[] args) {      

    Function<Node, Flux<Node>> expander =
        node -> Flux.fromIterable(node.children);

    List<Node> roots = createTestNodes();

    Flux.fromIterable(roots)
        .parallel(4)
        .runOn(Schedulers.parallel())
        .flatMap(node -> Flux.just(node).expandDeep(expander))
        .doOnNext(i -> System.out.println("Time: " + System.currentTimeMillis() + " thread: " + Thread.currentThread().getName() + " value: " + i))
        .sequential()
        .subscribe();

    try {
        Thread.sleep(500);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    System.out.println("finished");

}

我的测试数据:

代码语言:javascript
复制
static final class Node {
    final String name;
    final List<Node> children;

    Node(String name, Node... nodes) {
        this.name = name;
        this.children = new ArrayList<>();
        children.addAll(Arrays.asList(nodes));
    }

    @Override
    public String toString() {
        return name;
    }
}

static List<Node> createTestNodes() {
    return new Node("root",
        new Node("1",
            new Node("11")
        ),
        new Node("2",
            new Node("21"),
            new Node("22",
                new Node("221")
            )
        ),
        new Node("3",
            new Node("31"),
            new Node("32",
                new Node("321")
            ),
            new Node("33",
                new Node("331"),
                new Node("332",
                    new Node("3321")
                )
            )
        ),
        new Node("4",
            new Node("41"),
            new Node("42",
                new Node("421")
            ),
            new Node("43",
                new Node("431"),
                new Node("432",
                    new Node("4321")
                )
            ),
            new Node("44",
                new Node("441"),
                new Node("442",
                    new Node("4421")
                ),
                new Node("443",
                    new Node("4431"),
                    new Node("4432")
                )
            )
        )
    ).children;
}

和结果:

代码语言:javascript
复制
Time: 1549296674522 thread: parallel-4 value: 4
Time: 1549296674523 thread: parallel-4 value: 41
Time: 1549296674523 thread: parallel-2 value: 2
Time: 1549296674523 thread: parallel-2 value: 21
Time: 1549296674523 thread: parallel-3 value: 3
Time: 1549296674523 thread: parallel-3 value: 31
Time: 1549296674523 thread: parallel-1 value: 1
Time: 1549296674523 thread: parallel-1 value: 11
Time: 1549296674525 thread: parallel-2 value: 22
Time: 1549296674525 thread: parallel-2 value: 221
Time: 1549296674526 thread: parallel-3 value: 32
Time: 1549296674526 thread: parallel-3 value: 321
Time: 1549296674526 thread: parallel-3 value: 33
Time: 1549296674526 thread: parallel-3 value: 331
Time: 1549296674526 thread: parallel-3 value: 332
Time: 1549296674526 thread: parallel-3 value: 3321
Time: 1549296674526 thread: parallel-4 value: 42
Time: 1549296674526 thread: parallel-4 value: 421
Time: 1549296674526 thread: parallel-4 value: 43
Time: 1549296674526 thread: parallel-4 value: 431
Time: 1549296674526 thread: parallel-4 value: 432
Time: 1549296674526 thread: parallel-4 value: 4321
Time: 1549296674527 thread: parallel-4 value: 44
Time: 1549296674527 thread: parallel-4 value: 441
Time: 1549296674527 thread: parallel-4 value: 442
Time: 1549296674527 thread: parallel-4 value: 4421
Time: 1549296674528 thread: parallel-4 value: 443
Time: 1549296674528 thread: parallel-4 value: 4431
Time: 1549296674528 thread: parallel-4 value: 4432

正如您所看到的,expander在并行线程中工作。

票数 1
EN

Stack Overflow用户

发布于 2021-11-06 07:21:00

这里有一个例子,基于YauhenBalykin给出的例子

代码语言:javascript
复制
public static void main(String[] args) {

    Function<Node, Flux<Node>> expander =
            node -> Flux.fromIterable(node.children)
            .subscribeOn(Schedulers.parallel());

    List<Node> roots = createTestNodes();

    Flux.fromIterable(roots)
            .expand(expander)
            .doOnNext(i -> System.out.println("Time: " + System.currentTimeMillis() + " thread: " + Thread.currentThread().getName() + " value: " + i))
            .subscribe();

    try {
        Thread.sleep(500);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    System.out.println("finished");

}

测试数据:

代码语言:javascript
复制
static final class Node {
    final String name;
    final List<Node> children;

    Node(String name, Node... nodes) {
        this.name = name;
        this.children = new ArrayList<>();
        children.addAll(Arrays.asList(nodes));
    }

    @Override
    public String toString() {
        return name;
    }
}

static List<Node> createTestNodes() {
    return new Node("root",
            new Node("1",
                    new Node("11")
            ),
            new Node("2",
                    new Node("21"),
                    new Node("22",
                            new Node("221")
                    )
            ),
            new Node("3",
                    new Node("31"),
                    new Node("32",
                            new Node("321")
                    ),
                    new Node("33",
                            new Node("331"),
                            new Node("332",
                                    new Node("3321")
                            )
                    )
            ),
            new Node("4",
                    new Node("41"),
                    new Node("42",
                            new Node("421")
                    ),
                    new Node("43",
                            new Node("431"),
                            new Node("432",
                                    new Node("4321")
                            )
                    ),
                    new Node("44",
                            new Node("441"),
                            new Node("442",
                                    new Node("4421")
                            ),
                            new Node("443",
                                    new Node("4431"),
                                    new Node("4432")
                            )
                    )
            )
    ).children;
}

结果:

代码语言:javascript
复制
Time: 1636182895717 thread: main value: 1
Time: 1636182895754 thread: main value: 2
Time: 1636182895754 thread: main value: 3
Time: 1636182895754 thread: main value: 4
Time: 1636182895761 thread: parallel-1 value: 11
Time: 1636182895761 thread: parallel-2 value: 21
Time: 1636182895761 thread: parallel-2 value: 22
Time: 1636182895762 thread: parallel-3 value: 31
Time: 1636182895762 thread: parallel-3 value: 32
Time: 1636182895762 thread: parallel-3 value: 33
Time: 1636182895762 thread: parallel-4 value: 41
Time: 1636182895762 thread: parallel-4 value: 42
Time: 1636182895762 thread: parallel-4 value: 43
Time: 1636182895762 thread: parallel-4 value: 44
Time: 1636182895764 thread: parallel-7 value: 221
Time: 1636182895764 thread: parallel-9 value: 321
Time: 1636182895764 thread: parallel-10 value: 331
Time: 1636182895765 thread: parallel-10 value: 332
Time: 1636182895765 thread: parallel-12 value: 421
Time: 1636182895765 thread: parallel-1 value: 431
Time: 1636182895765 thread: parallel-1 value: 432
Time: 1636182895766 thread: parallel-2 value: 441
Time: 1636182895766 thread: parallel-2 value: 442
Time: 1636182895766 thread: parallel-2 value: 443
Time: 1636182895766 thread: parallel-6 value: 3321
Time: 1636182895767 thread: parallel-9 value: 4321
Time: 1636182895767 thread: parallel-11 value: 4421
Time: 1636182895767 thread: parallel-12 value: 4431
Time: 1636182895767 thread: parallel-12 value: 4432
finished
票数 -1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/54513970

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档