我有一个不断生成和存储新数据值的类(使用线程池)。我希望为客户端代码(“订阅者”)提供一种方法来利用(连接到)新数据值的序列。但是,如果我的类没有客户端,或者所有客户端都完成了序列的读取,我希望它继续生成和存储新值,而不是停止。当客户端连接到该序列时,它会接收新生成的值,但不会接收过去生成的值。哪个项目反应器类(或多个类)适合做这件事?
我想我需要使用Flux来表示新值的序列,但是使用哪个Flux类(或工厂方法)呢?
发布于 2019-01-23 21:17:16
使用DirectProcessor
据我所知,需要的是订阅上游的能力,无论是否有订阅者的its。
在DirectProcessor的支持下,这是可以实现的。由于Processor是Publisher和Subscriber的组合,因此它可以“运行”在上游并连续侦听传入的信号。同时,DirectProcessor支持消息解复用,或者简单地将消息广播到所有可用的下游订户(如果他们正在监听)。
例如,让我们考虑以下代码示例:
Flux<Long> intervalFlux = Flux.interval(Duration.ofMillis(500)).log("upstream");
DirectProcessor processor = DirectProcessor.create();
intervalFlux.subscribe(processor);
Thread.sleep(2000);
Disposable downstream1 = processor.log("downstream1")
.subscribe();
Thread.sleep(1000);
downstream1.dispose();
Thread.sleep(1000);
Disposable downstream2 = processor.log("downstream2")
.subscribe();
Thread.sleep(2000);正如我们所看到的,我们使用processor订阅了上游,因此interval Flux开始生成数据。然后我们订阅了processor并等待了1秒,因此downstream1应该观察到两个事件,而log("upstream")操作员一般会记录6个事件。在那之后,我们取消了订阅,这样downstream1订阅者应该停止观察任何事件,但log("upstream")仍然应该观察间隔。然后,在又一次暂停之后,我们使用另一个downstrea2订阅者订阅了流,该订阅者应该观察另外四个事件。
上述代码的一般输出如下:
2019-01-23 15:09:04,246 INFO upstream [main] onSubscribe(FluxInterval.IntervalRunnable)
2019-01-23 15:09:04,249 INFO upstream [main] request(unbounded)
2019-01-23 15:09:04,757 INFO upstream [parallel-1] onNext(0)
2019-01-23 15:09:05,252 INFO upstream [parallel-1] onNext(1)
2019-01-23 15:09:05,751 INFO upstream [parallel-1] onNext(2)
2019-01-23 15:09:06,252 INFO upstream [parallel-1] onNext(3)
2019-01-23 15:09:06,258 INFO downstream1 [main] onSubscribe(DirectProcessor.DirectInner)
2019-01-23 15:09:06,258 INFO downstream1 [main] request(unbounded)
2019-01-23 15:09:06,754 INFO upstream [parallel-1] onNext(4)
2019-01-23 15:09:06,755 INFO downstream1 [parallel-1] onNext(4)
2019-01-23 15:09:07,254 INFO upstream [parallel-1] onNext(5)
2019-01-23 15:09:07,254 INFO downstream1 [parallel-1] onNext(5)
2019-01-23 15:09:07,263 INFO downstream1 [main] cancel()
2019-01-23 15:09:07,755 INFO upstream [parallel-1] onNext(6)
2019-01-23 15:09:08,255 INFO upstream [parallel-1] onNext(7)
2019-01-23 15:09:08,265 INFO downstream2 [main] onSubscribe(DirectProcessor.DirectInner)
2019-01-23 15:09:08,265 INFO downstream2 [main] request(unbounded)
2019-01-23 15:09:08,755 INFO upstream [parallel-1] onNext(8)
2019-01-23 15:09:08,756 INFO downstream2 [parallel-1] onNext(8)
2019-01-23 15:09:09,255 INFO upstream [parallel-1] onNext(9)
2019-01-23 15:09:09,256 INFO downstream2 [parallel-1] onNext(9)
2019-01-23 15:09:09,751 INFO upstream [parallel-1] onNext(10)
2019-01-23 15:09:09,751 INFO downstream2 [parallel-1] onNext(10)
2019-01-23 15:09:10,255 INFO upstream [parallel-1] onNext(11)
2019-01-23 15:09:10,255 INFO downstream2 [parallel-1] onNext(11)正如我们所看到的,DirectProcessor启用了所需的行为,因此它很可能适合那里。
备注
DirectProcessor不支持背压,因此在背压很重要的情况下,可以在那里使用limitRate operator操作符。
另请参阅
https://projectreactor.io/docs/core/release/reference/#_direct_processor
https://stackoverflow.com/questions/54327228
复制相似问题