我需要在Java语言中实现一个线程池(java.util.concurrent),它的线程数在空闲时处于某个最小值,当作业提交到线程池中时增长到一个上限(但不会超过这个值),当所有作业都完成并且没有更多作业提交时,线程池会缩小回下限。
你将如何实现这样的东西?我认为这将是一个相当常见的使用场景,但显然java.util.concurrent.Executors工厂方法只能创建固定大小的池和池,这些池在提交许多作业时会无限增长。ThreadPoolExecutor类提供了corePoolSize和maximumPoolSize参数,但它的文档似乎暗示,同时拥有多个corePoolSize线程的唯一方法是使用有界作业队列,在这种情况下,如果您已经到达maximumPoolSize线程,您将得到作业拒绝,而您必须自己处理?我想出了这个:
//pool creation
ExecutorService pool = new ThreadPoolExecutor(minSize, maxSize, 500, TimeUnit.MILLISECONDS,
new ArrayBlockingQueue<Runnable>(minSize));
...
//submitting jobs
for (Runnable job : ...) {
while (true) {
try {
pool.submit(job);
System.out.println("Job " + job + ": submitted");
break;
} catch (RejectedExecutionException e) {
// maxSize jobs executing concurrently atm.; re-submit new job after short wait
System.out.println("Job " + job + ": rejected...");
try {
Thread.sleep(300);
} catch (InterruptedException e1) {
}
}
}
}我是不是忽略了什么?有没有更好的方法来做这件事?此外,根据个人的需求,上面的代码至少在(我认为) (total number of jobs) - maxSize作业完成之前不会完成,这可能是有问题的。因此,如果您希望能够将任意数量的作业提交到池中并立即继续,而不等待任何作业完成,那么如果没有一个专用的“作业求和”线程来管理所需的无界队列来容纳所有提交的作业,我不明白您如何做到这一点。如果你对ThreadPoolExecutor本身使用一个无界队列,它的线程数永远不会超过corePoolSize。
发布于 2012-06-29 00:58:32
一个可能对您有帮助的技巧是分配一个RejectedExecutionHandler,它使用相同的线程将作业提交到阻塞队列中。这将阻塞当前线程,并消除对某种循环的需要。
请看我的回答:
How can I make ThreadPoolExecutor command wait if there's too much data it needs to work on?
下面是从该答案中复制的拒绝处理程序。
final BlockingQueue queue = new ArrayBlockingQueue<Runnable>(200);
ThreadPoolExecutor threadPool = new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS, queue);
// by default (unfortunately) the ThreadPoolExecutor will call the rejected
// handler when you submit the 201st job, to have it block you do:
threadPool.setRejectedExecutionHandler(new RejectedExecutionHandler() {
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
// this will block if the queue is full
executor.getQueue().put(r);
}
});然后,只要您意识到您使用的有界阻塞队列在核心线程之上创建任何线程之前首先填满,您就应该能够利用core/max线程计数。因此,如果您有10个核心线程,并且希望第11个作业启动第11个线程,那么您将需要一个大小为0的阻塞队列(可能是一个SynchronousQueue)。我觉得这在其他优秀的ExecutorService类中是一个真正的限制。
发布于 2012-06-29 01:40:39
当线程增长和缩小在一起时,我脑海中只出现了一个名字:来自java.util.concurrent包的CachedThreadPool。
ExecutorService executor = Executors.newCachedThreadPool();CachedThreadPool()可以重用线程,也可以在需要时创建新线程。是的,如果一个线程空闲了60秒,CachedThreadPool会杀死它。所以这是非常轻量级的--用你的话说就是增长和缩小!
发布于 2012-06-29 00:58:42
将maximumPoolSize设置为Integer.MAX_VALUE。如果你有超过20亿的threads...well,祝你好运。
无论如何,ThreadPoolExecutor的Javadoc声明:
通过将maximumPoolSize设置为一个基本无界的值(如Integer.MAX_VALUE ),您可以允许池容纳任意数量的并发任务。最典型的情况是,核心和最大池大小仅在构造时设置,但也可以使用setCorePoolSize(int)和setMaximumPoolSize(int)动态更改它们。
对于类似于LinkedBlockingQueue的无界任务队列,这应该具有任意大的容量。
https://stackoverflow.com/questions/11249342
复制相似问题