我正在开发一个需要并行周期性地检查多个资源的程序:
public class JobRunner {
private final SensorService sensorService;
private ScheduledExecutorService executor = Executors.newScheduledThreadPool(Runtime.getRuntime().availableProcessors());
public void run() {
sensorService.finalAll().forEach(sensor -> {
Runnable task = () -> {
// read and save new data to log
List<Double> data = sensor.fetchNewData();
this.save(data);
};
// execute every 10 sec
executor.scheduleWithFixedDelay(task, 0, 10, TimeUnit.SECONDS);
});
}
public void save(List<Double> data) {
// ...
}
}findAll调用返回大约50个传感器的列表,但是当我运行该程序时,我看到虽然在第一个周期查询了所有传感器,但在随后的执行中(例如,在20秒、30秒等),只调用了2-3个传感器。我在想,由于一些传感器比其他传感器返回得更快,它们会更早地完成任务的等待周期,并被池中的下一个线程占用,从而使其他完成较慢的任务变得饥饿。
如何确保所有任务(传感器)都得到平等对待?这里有哪些最佳实践;我应该使用作业队列还是不同的并发机制?谢谢。
发布于 2017-02-05 15:49:22
在您的代码中有N=count service.findAll()计时器,这使得调试和测试更加困难。此外,不能保证在合理的时间内旧任务将被执行,并且不会被新任务超越。如果你
触发时同时通过传感器
请看下一段代码作为示例。它每10秒打印50个整数,然后停止打印。并行是通过Stream API实现的
ScheduledExecutorService executor = Executors.newScheduledThreadPool(1);
executor.scheduleWithFixedDelay(new Runnable() {
@Override
public void run() {
IntStream.range(0, 50).parallel().forEach(i -> System.out.print(i + " "));
System.out.println();
}
}, 0, 10, TimeUnit.SECONDS);您可以将ScheduledExecutorService替换为Timer,以使代码更清晰。而且,作为一种选择,您可以不使用并行流,而是使用另一个ExecutorService,在Timer上将下一个N任务提交给它,并等待它们完成:
ExecutorService workerExecutor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
Timer timer = new Timer();
timer.schedule(new TimerTask() {
@Override
public void run() {
List<Future<Void>> futures = new ArrayList<>();
for (int i = 0; i < 50; i++) {
final int index = i;
Future<Void> future = workerExecutor.submit(new Callable<Void>() {
@Override
public Void call() throws Exception {
System.out.print(index + " ");
return null;
}
});
futures.add(future);
}
for (Future<Void> future : futures) {
try {
future.get();
} catch (InterruptedException|ExecutionException e) {
throw new RuntimeException();
}
}
System.out.println();
}
}, 0, 10_000);https://stackoverflow.com/questions/42012844
复制相似问题