首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >Reactor sequential()似乎丢失了事件

Reactor sequential()似乎丢失了事件
EN

Stack Overflow用户
提问于 2019-05-01 21:51:51
回答 1查看 107关注 0票数 1

在Flux中,对流进行并行化,然后进行排序,似乎没有始终如一地强制顺序()方法的假定循环性质,并且似乎事件可能以非常混乱的顺序结束,以至于它们实际上丢失了。这种行为在多次运行中并不一致,一些运行是按顺序进行的,而另一些运行则变化很大。

我知道事件可能会在某种程度上失序,但即使是在一个简单的例子中,这种程度也足以使一些事件延迟到其有用的生命周期之外。

对于固定的数据集,这可能是完全可以接受的,但对于来自Kafka的事件流,这可能会导致数据丢失,很难调试。

在此示例中,在多次运行中,您可能会看到按顺序打印2-1000中的每个偶数,然后在另一次运行中看到一系列偶数,从2左右开始,直到1700,其中一些两位数字从未出现在序列中。

我已经改变了并行线程的数量,顺序预取,添加了publishOn和subscribeOn步骤,但似乎没有任何东西使这更多或更少可预测。

代码语言:javascript
复制
    Flux.range(1, 5000)
        .parallel(64)
        .runOn(Schedulers.newParallel("test", 64))
        .filter(integer -> integer % 2 == 0)
        .sequential()
        .take(500)
        .doOnNext(System.out::println)
        .blockLast();
  }

当然,在足够长的时间跨度内,每个值都会出现,但在实际情况下,一些事件可能会延迟太长时间而无法使用。

循环调度并不完美,但在我看来这并不是循环调度。我是不是做错了什么,或者这是一个更深层次的问题?

EN

回答 1

Stack Overflow用户

发布于 2019-05-07 21:54:04

我试过运行你的例子,每次我都会得到500个项目。

您不能期望它产生一个可预测的序列,因为处理是并行的,并且您的核心很可能少于这里使用的线程计数(64)。一些线程将得不到足够的CPU来完成它们的任务,而其他线程将因此而获胜,take(500)将选择获胜的号码。

parallel的分布是循环的,但处理取决于线程调度器。

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/55937343

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档