首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >使用reactor并发使用集合

使用reactor并发使用集合
EN

Stack Overflow用户
提问于 2016-03-20 11:13:32
回答 1查看 467关注 0票数 0

在我所参与的项目中,有时不得不实现经典的并发生产者-消费者解决方案,这在很大程度上减少了问题,因为有一些从多个线程填充的集合,这些集合被多个消费者消费。简而言之,集合限制为10k个实体,一旦达到缓冲区大小,提交的工作任务将消耗这10k个实体,这类工作任务的限制是设置为10个,在最坏的情况下,这意味着我最多可以有10个工作任务,每个工作任务使用10k个实体。

我确实必须在这里和那里尝试一些锁定,围绕缓冲区溢出进行一些检查(当生产者生成太多数据,而所有工作者都在忙于处理他们的块时),因此必须丢弃新的事件以避免OOM (不是最好的解决方案,但稳定性是p1;)

这些天来,我环顾着reactor和一种使用它的方法,而不是去低级,做上面描述的所有事情,所以愚蠢的问题是:“reactor可以用于这个用例吗?”现在先忘掉溢出/丢弃..作为一家广播公司,我如何才能实现N个消费者?

特别是在使用buffer +线程池调度程序的广播器周围:

代码语言:javascript
复制
void test() {
  final Broadcaster<String> sink =   Broadcaster.create(Environment.initialize());
  Dispatcher dispatcher = Environment.newDispatcher(2048, 20, DispatcherType.WORK_QUEUE);

  sink
    .buffer(100)
    .consumeOn(dispatcher, this::log);

  for (int i=0; i<100000; i++) {
    sink.onNext("elementent " + i);
    if (i%1000 == 0) {
      System.out.println("addded elements " + i);
    }
  }
}
 void log(List<String> values) {
  System.out.print("simulating slow processing....");
  System.out.println("processing: " + Arrays.toString(values.toArray()));
  try {
    Thread.sleep(1000);
  } catch (InterruptedException e) {
    e.printStackTrace();
  }
}

我在这里的意图是让广播公司执行日志(..)在异步方式下,当缓冲区大小达到时,它看起来总是在执行log(...)它处于阻塞模式。执行100之后,执行下一个100,依此类推。我怎么才能让它异步呢?

感谢vyvalyty

EN

回答 1

Stack Overflow用户

发布于 2016-03-30 21:14:36

一种可能的模式是在publishOn中使用flatMap:

代码语言:javascript
复制
Flux.range(1, 1_000_000)
.buffer(100)
.flatMap(b -> Flux.just(b).publishOn(SchedulerGroup.io())
   .doOnNext(this::log))
.consume(...);
票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/36109839

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档