在我的应用程序中,我有多个scheduler线程来创建任务。例如,每个调度程序线程都可以创建一组任务:
TaskCreator tastCreator;
for (Report report: report) {
taskCreator.createTask(report);
}正如您从日志中看到的那样,调度程序线程可以并发运行:
15:57:20.107 INFO [ scheduler-4] c.task.ReportExportSchedulerTask : Task created
15:57:20.107 INFO [ scheduler-2] c.task.ReportExportSchedulerTask : Task created我有一个TaskCreator组件,如下所示,它将任务传递给executeJob()
@Component
public class TaskCreator {
@Autowired
private SftpTaskExecutor sftpTaskExecutor;
@Autowired
SftpConfig sftpConfig;
@Autowired
private SFTPConnectionManager connectionManager;
public void createTask(Report report) {
sftpTaskExecutor.executeJob(new JobProcessorTask(...));
}
public void validateTasksExecution() {
sftpTaskExecutor.getExecutorService().shutdown();
while (!sftpTaskExecutor.getExecutorService().isTerminated()) ;
connectionManager.disconnect();
}
}SftpTaskExecutor Component如下所示,它构造了一个executorService,我将上述任务提交给该executorService:
@Component
public class SftpTaskExecutor {
private ExecutorService executorService = Executors.newSingleThreadExecutor();
public void executeJob(JobProcessorTask jobProcessorTask) {
executorService.execute(jobProcessorTask);
}
public ExecutorService getExecutorService() {
return executorService;
}
}我的问题是,如果两个或多个调度程序线程同时创建任务并提交给executor服务,则上述抛出一个RejectedExecutionException,其中一个调度器任务未完成(即文件未发送)。
对于每个调度线程,我需要能够在不干扰其他调度程序线程的情况下调用validateTasksExecution()。换句话说,不要在其他调度程序仍在处理时断开连接。
在这方面,我是否正确地使用了ExecutorService?如何将上面的内容更改为线程安全?
发布于 2017-10-25 19:57:12
我的问题是,如果两个或多个调度器线程同时创建任务并提交给executor服务,上述线程将抛出一个RejectedExecutionException,其中一个调度器任务未完成(即文件未发送)。
让我们来看看ExecutorService.execute(...)
RejectedExecutionException--如果不能接受此任务来执行。
在查看ThreadPoolExecutor (和相关的)代码时,这些作业被拒绝有两个原因:
我相信您的执行器服务已经关闭,很可能是因为您的第一个线程在第二个线程调用executeJob(...)之前调用了executeJob(...)。如果试图重用线程池,则代码是不正确的。您也要关闭connectionManager(),这让我想知道您是否想要重用SftpTaskExecutor。
如果希望每个线程查看其操作是否已完成,但线程池仍在运行,则需要从ExecutorService.submit(...)方法中保存ExecutorService.submit(...)并对其调用get()。这会告诉你什么时候工作完成了。
类似于:
public Future<Void> createTask(Report report) {
return sftpTaskExecutor.executeJob(new JobProcessorTask(...));
}
public void validateTasksExecution(Future<Void> future) {
// there is some exceptions here you need to handle
future.get();
}
public void shutdown() {
sftpTaskExecutor.shutdown();
connectionManager.disconnect();
}
...
public Future<Void> executeJob(JobProcessorTask jobProcessorTask) {
return executorService.submit(jobProcessorTask);
}如果您需要监视多个作业,那么您应该将它们存储在一个集合中,并对它们进行串行调用( get() ),尽管这些作业将并行运行。
另一种选择是为每个事务设置一个单独的ExecutorService,这是浪费的,但考虑到它正在管理sftp调用,这可能并不是很糟糕。
而(!sftpTaskExecutor.getExecutorService().isTerminated());
是啊,你不想那样旋转。见 javadocs。
https://stackoverflow.com/questions/46914811
复制相似问题