在我所参与的项目中,有时不得不实现经典的并发生产者-消费者解决方案,这在很大程度上减少了问题,因为有一些从多个线程填充的集合,这些集合被多个消费者消费。简而言之,集合限制为10k个实体,一旦达到缓冲区大小,提交的工作任务将消耗这10k个实体,这类工作任务的限制是设置为10个,在最坏的情况下,这意味着我最多可以有10个工作任务,每个工作任务使用10k个实体。
我确实必须在这里和那里尝试一些锁定,围绕缓冲区溢出进行一些检查(当生产者生成太多数据,而所有工作者都在忙于处理他们的块时),因此必须丢弃新的事件以避免OOM (不是最好的解决方案,但稳定性是p1;)
这些天来,我环顾着reactor和一种使用它的方法,而不是去低级,做上面描述的所有事情,所以愚蠢的问题是:“reactor可以用于这个用例吗?”现在先忘掉溢出/丢弃..作为一家广播公司,我如何才能实现N个消费者?
特别是在使用buffer +线程池调度程序的广播器周围:
void test() {
final Broadcaster<String> sink = Broadcaster.create(Environment.initialize());
Dispatcher dispatcher = Environment.newDispatcher(2048, 20, DispatcherType.WORK_QUEUE);
sink
.buffer(100)
.consumeOn(dispatcher, this::log);
for (int i=0; i<100000; i++) {
sink.onNext("elementent " + i);
if (i%1000 == 0) {
System.out.println("addded elements " + i);
}
}
}
void log(List<String> values) {
System.out.print("simulating slow processing....");
System.out.println("processing: " + Arrays.toString(values.toArray()));
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}我在这里的意图是让广播公司执行日志(..)在异步方式下,当缓冲区大小达到时,它看起来总是在执行log(...)它处于阻塞模式。执行100之后,执行下一个100,依此类推。我怎么才能让它异步呢?
感谢vyvalyty
发布于 2016-03-30 21:14:36
一种可能的模式是在publishOn中使用flatMap:
Flux.range(1, 1_000_000)
.buffer(100)
.flatMap(b -> Flux.just(b).publishOn(SchedulerGroup.io())
.doOnNext(this::log))
.consume(...);https://stackoverflow.com/questions/36109839
复制相似问题