首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >Java ScheduledExecutorService -在多个并行任务中防止饥饿

Java ScheduledExecutorService -在多个并行任务中防止饥饿
EN

Stack Overflow用户
提问于 2017-02-03 05:44:40
回答 1查看 526关注 0票数 0

我正在开发一个需要并行周期性地检查多个资源的程序:

代码语言:javascript
复制
public class JobRunner {

    private final SensorService sensorService;
    private ScheduledExecutorService executor = Executors.newScheduledThreadPool(Runtime.getRuntime().availableProcessors());

    public void run() {
        sensorService.finalAll().forEach(sensor -> {
            Runnable task = () -> {
                // read and save new data to log
                List<Double> data = sensor.fetchNewData();
                this.save(data);
            };

            // execute every 10 sec
            executor.scheduleWithFixedDelay(task, 0, 10, TimeUnit.SECONDS);
        });
    }

    public void save(List<Double> data) {
        // ...
    }
}

findAll调用返回大约50个传感器的列表,但是当我运行该程序时,我看到虽然在第一个周期查询了所有传感器,但在随后的执行中(例如,在20秒、30秒等),只调用了2-3个传感器。我在想,由于一些传感器比其他传感器返回得更快,它们会更早地完成任务的等待周期,并被池中的下一个线程占用,从而使其他完成较慢的任务变得饥饿。

如何确保所有任务(传感器)都得到平等对待?这里有哪些最佳实践;我应该使用作业队列还是不同的并发机制?谢谢。

EN

回答 1

Stack Overflow用户

发布于 2017-02-05 15:49:22

在您的代码中有N=count service.findAll()计时器,这使得调试和测试更加困难。此外,不能保证在合理的时间内旧任务将被执行,并且不会被新任务超越。如果你

  1. 使用单个计时器触发传感器检查10s最后一次所有传感器检查已完成
  2. 当检查由timer

触发时同时通过传感器

请看下一段代码作为示例。它每10秒打印50个整数,然后停止打印。并行是通过Stream API实现的

代码语言:javascript
复制
ScheduledExecutorService executor = Executors.newScheduledThreadPool(1);
executor.scheduleWithFixedDelay(new Runnable() {
    @Override
    public void run() {
        IntStream.range(0, 50).parallel().forEach(i -> System.out.print(i + " "));
        System.out.println();
    }
}, 0, 10, TimeUnit.SECONDS);

您可以将ScheduledExecutorService替换为Timer,以使代码更清晰。而且,作为一种选择,您可以不使用并行流,而是使用另一个ExecutorService,在Timer上将下一个N任务提交给它,并等待它们完成:

代码语言:javascript
复制
ExecutorService workerExecutor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
Timer timer = new Timer();
timer.schedule(new TimerTask() {
    @Override
    public void run() {
        List<Future<Void>> futures = new ArrayList<>();
        for (int i = 0; i < 50; i++) {
            final int index = i;
            Future<Void> future = workerExecutor.submit(new Callable<Void>() {
                @Override
                public Void call() throws Exception {
                    System.out.print(index + " ");
                    return null;
                }
            });
            futures.add(future);
        }
        for (Future<Void> future : futures) {
            try {
                future.get();
            } catch (InterruptedException|ExecutionException e) {
                throw new RuntimeException();
            }
        }
        System.out.println();
    }
}, 0, 10_000);
票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/42012844

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档