在Flux中,对流进行并行化,然后进行排序,似乎没有始终如一地强制顺序()方法的假定循环性质,并且似乎事件可能以非常混乱的顺序结束,以至于它们实际上丢失了。这种行为在多次运行中并不一致,一些运行是按顺序进行的,而另一些运行则变化很大。
我知道事件可能会在某种程度上失序,但即使是在一个简单的例子中,这种程度也足以使一些事件延迟到其有用的生命周期之外。
对于固定的数据集,这可能是完全可以接受的,但对于来自Kafka的事件流,这可能会导致数据丢失,很难调试。
在此示例中,在多次运行中,您可能会看到按顺序打印2-1000中的每个偶数,然后在另一次运行中看到一系列偶数,从2左右开始,直到1700,其中一些两位数字从未出现在序列中。
我已经改变了并行线程的数量,顺序预取,添加了publishOn和subscribeOn步骤,但似乎没有任何东西使这更多或更少可预测。
Flux.range(1, 5000)
.parallel(64)
.runOn(Schedulers.newParallel("test", 64))
.filter(integer -> integer % 2 == 0)
.sequential()
.take(500)
.doOnNext(System.out::println)
.blockLast();
}当然,在足够长的时间跨度内,每个值都会出现,但在实际情况下,一些事件可能会延迟太长时间而无法使用。
循环调度并不完美,但在我看来这并不是循环调度。我是不是做错了什么,或者这是一个更深层次的问题?
发布于 2019-05-07 21:54:04
我试过运行你的例子,每次我都会得到500个项目。
您不能期望它产生一个可预测的序列,因为处理是并行的,并且您的核心很可能少于这里使用的线程计数(64)。一些线程将得不到足够的CPU来完成它们的任务,而其他线程将因此而获胜,take(500)将选择获胜的号码。
parallel的分布是循环的,但处理取决于线程调度器。
https://stackoverflow.com/questions/55937343
复制相似问题