Flux.create和Flux.generate有什么区别?我正在寻找--理想情况下是一个示例用例--来理解我什么时候应该使用其中一个。
发布于 2018-04-26 00:41:26
简而言之:
Flux::create不会对应用程序状态的变化做出反应,而Flux::generate会。
长版本
Flux::create
当你想要计算多个(0...infinity)值时,你将使用它,这些值不受你的应用状态和你的管道状态的影响(你的管道==在Flux::create == ==之后的操作链)。
为什么?因为您发送给Flux::create的方法一直在计算元素(或无)。下游将决定它想要多少元素(elements == next signals),如果他跟不上,那些已经发出的元素将以某种策略被移除/缓冲(默认情况下,它们将被缓冲,直到下游要求更多)。
第一个也是最简单的用例是发送值,理论上,这些值可以加到一个集合中,然后才能获取每个元素并对其执行某些操作:
Flux<String> articlesFlux = Flux.create((FluxSink<String> sink) -> {
/* get all the latest article from a server and emit them one by one to downstream. */
List<String> articals = getArticalsFromServer();
articals.forEach(sink::next);
});正如您所看到的,Flux.create用于阻塞方法(getArticalsFromServer)与异步代码之间的交互。
我相信Flux.create还有其他用例。
Flux::generate
Flux.generate((SynchronousSink<Integer> synchronousSink) -> {
synchronousSink.next(1);
})
.doOnNext(number -> System.out.println(number))
.doOnNext(number -> System.out.println(number + 4))
.subscribe();输出将为1 5 1 5 1 5................forever
在您发送给Flux::generate的方法的每个调用中,synchronousSink只能发出:onSubscribe onNext? (onError | onComplete)?。
这意味着Flux::generate将按需计算并发出的值。你应该什么时候使用它?在计算下游可能不会使用的元素或您发出的事件受应用程序状态或管道状态影响的情况下(您的管道==位于Flux::create ==下游之后的操作链)。
例如,如果您正在构建torrent应用程序,那么您将实时接收数据块。您可以使用Flux::generate将任务(要下载的块)分配给多个线程,并且只在某个线程请求时才会在Flux::generate中计算您想要下载的块。因此,您将只发出您没有的块。使用Flux::create的相同算法将失败,因为Flux::create将发出我们没有的所有块,如果一些块无法下载,那么我们就有问题了。因为Flux::create不会对应用程序状态的变化做出反应,而Flux::generate会。
发布于 2019-12-07 00:16:51
Create:

对于每个Consumer<FluxSink<T>>
生成:

根据下游需求一次又一次地调用Consumer<SynchronousSink<T>>
有关更多详细信息,请查看此blog。
https://stackoverflow.com/questions/49951060
复制相似问题