我通过实现org.reactivestreams.Publisher实现了一个反应堆操作符,如下所示。然而,我不知道这是否是™使用反应堆的正确方式。手工实现订阅服务器看起来有点麻烦。在这方面,运算符类似乎没有帮助。
class MyOperator implements Publisher<Integer> {
private final Publisher<Integer> source;
public MyOperator(Publisher<Integer> source) {
this.source = source;
}
@Override
public void subscribe(Subscriber<? super Integer> target) {
final Subscriber<Integer> wrappedSubscriber = createSubscriber(target);
source.subscribe(wrappedSubscriber);
}
private Subscriber<Integer> createSubscriber(Subscriber<? super Integer> target) {
return new Subscriber<Integer>() {
private Subscription subscription;
@Override
public void onSubscribe(Subscription subscription) {
this.subscription = subscription;
subscription.request(Long.MAX_VALUE);
}
@Override
public void onNext(Integer integer) {
target.onNext(integer + 1); // actual behaviour
subscription.request(Long.MAX_VALUE);
}
@Override
public void onError(Throwable t) {
target.onError(t);
}
@Override
public void onComplete() {
target.onComplete();
}
};
}
}或者下面的示例是正确的™方式?
class MyCompactOperator implements Publisher<Integer> {
final Flux<Integer> flux;
public MyCompactOperator(Publisher<Integer> source) {
flux = Flux.from(source).map(number -> number + 1);
}
@Override
public void subscribe(Subscriber<? super Integer> subscriber) {
flux.subscribe(subscriber);
}
}至少这需要更少的代码。
如西蒙·巴斯莱( Simoné)的建议,以Flux为源的变体3:
class MyFluxOperator extends Flux<Integer> {
private final Flux<Integer> source;
public MyFluxOperator(Flux<Integer> source) {
this.source = source;
}
@Override
public void subscribe(Subscriber<? super Integer> subscriber) {
source.map(number -> number + 1).subscribe(subscriber);
}
}所有实现都如预期的那样工作:
Flux<Integer> source = Flux.just(1, 2, 3, 4, 5);
Flux.from(new MyOperator(source)).subscribe(System.out::println);
// for variant 3
new MyFluxOperator(source).subscribe(System.out::println);我在第二行中使用了一个Flux,以避免另一个订阅服务器的实现。
输出:
2
3
4
5
6问题:
发布于 2017-08-17 21:55:50
看到您的第二个选项,您似乎认为您必须实现一个发布者。情况肯定不是这样(恰恰相反)。从反应堆流量源(或发行者+反应堆的Flux.from)开始,简单地在map中建立链。
编辑:为了澄清您不想创建任何类,只需在主代码路径中这样做:
source已经是Flux或Mono:
通量incrementedSource = source.map(i -> I+ 1);incrementedSource.subscribe(用户);source是另一种Publisher:
通量incrementedSource =Flux.from(源).map(I -> I+ 1);incrementedSource.subscribe(订户);像反应堆这样的库的整个想法是给你可以直接写作而不需要写Publisher的操作符。
如果您希望找到一种方法来实现代码的互操作,因为您经常将一组运算符应用于各种Flux,那么请查看transform和compose (以及参考文献)。
https://stackoverflow.com/questions/45739200
复制相似问题