首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >如何在反应堆3中编写操作符?

如何在反应堆3中编写操作符?
EN

Stack Overflow用户
提问于 2017-08-17 15:26:20
回答 1查看 1.8K关注 0票数 2

我通过实现org.reactivestreams.Publisher实现了一个反应堆操作符,如下所示。然而,我不知道这是否是™使用反应堆的正确方式。手工实现订阅服务器看起来有点麻烦。在这方面,运算符类似乎没有帮助。

代码语言:javascript
复制
class MyOperator implements Publisher<Integer> {

    private final Publisher<Integer> source;

    public MyOperator(Publisher<Integer> source) {
        this.source = source;
    }

    @Override
    public void subscribe(Subscriber<? super Integer> target) {
        final Subscriber<Integer> wrappedSubscriber = createSubscriber(target);
        source.subscribe(wrappedSubscriber);
    }

    private Subscriber<Integer> createSubscriber(Subscriber<? super Integer> target) {
        return new Subscriber<Integer>() {
            private Subscription subscription;

            @Override
            public void onSubscribe(Subscription subscription) {
                this.subscription = subscription;
                subscription.request(Long.MAX_VALUE);
            }

            @Override
            public void onNext(Integer integer) {
                target.onNext(integer + 1); // actual behaviour
                subscription.request(Long.MAX_VALUE);
            }

            @Override
            public void onError(Throwable t) {
                target.onError(t);
            }

            @Override
            public void onComplete() {
                target.onComplete();
            }
        };
    }
}

或者下面的示例是正确的™方式?

代码语言:javascript
复制
class MyCompactOperator implements Publisher<Integer> {

    final Flux<Integer> flux;

    public MyCompactOperator(Publisher<Integer> source) {
        flux = Flux.from(source).map(number -> number + 1);
    }

    @Override
    public void subscribe(Subscriber<? super Integer> subscriber) {
        flux.subscribe(subscriber);
    }
}

至少这需要更少的代码。

如西蒙·巴斯莱( Simoné)的建议,以Flux为源的变体3:

代码语言:javascript
复制
class MyFluxOperator extends Flux<Integer> {

    private final Flux<Integer> source;

    public MyFluxOperator(Flux<Integer> source) {
        this.source = source;
    }

    @Override
    public void subscribe(Subscriber<? super Integer> subscriber) {
        source.map(number -> number + 1).subscribe(subscriber);
    }
}

所有实现都如预期的那样工作:

代码语言:javascript
复制
Flux<Integer> source = Flux.just(1, 2, 3, 4, 5);
Flux.from(new MyOperator(source)).subscribe(System.out::println);

// for variant 3
new MyFluxOperator(source).subscribe(System.out::println);

我在第二行中使用了一个Flux,以避免另一个订阅服务器的实现。

输出:

代码语言:javascript
复制
2
3
4
5
6

问题:

  • 我的执行过程中缺少什么东西吗?
  • 是否有更好的方法(使用更少的代码、更好的错误处理或其他方法)来实现反应堆3中的操作符?
  • 两种方法之间是否存在相关的功能性或非功能性差异?
EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2017-08-17 21:55:50

看到您的第二个选项,您似乎认为您必须实现一个发布者。情况肯定不是这样(恰恰相反)。从反应堆流量源(或发行者+反应堆的Flux.from)开始,简单地在map中建立链。

编辑:为了澄清您不想创建任何类,只需在主代码路径中这样做:

  • 如果您的source已经是FluxMono: 通量incrementedSource = source.map(i -> I+ 1);incrementedSource.subscribe(用户);
  • 如果您的source是另一种Publisher: 通量incrementedSource =Flux.from(源).map(I -> I+ 1);incrementedSource.subscribe(订户);

像反应堆这样的库的整个想法是给你可以直接写作而不需要写Publisher的操作符。

如果您希望找到一种方法来实现代码的互操作,因为您经常将一组运算符应用于各种Flux,那么请查看transformcompose (以及参考文献)。

票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/45739200

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档