如以下连结所述:
如何使ThreadPoolExecutor在排队前将线程增加到最大?
在输入元素后,我将队列实现更改为返回false。因此,每当向队列中插入新任务时,都会为其创建一个新线程。
但是,当我在记录器上大规模运行以下实现(Bis系统测试)时,会产生一个新的问题。
当任务执行时,它被插入到队列中,当队列返回false时,将创建一个新线程来执行它。当前在池中的空闲线程不会被捕获。任务的原因分配给方法中的空闲线程,后者从队列中选择任务。因此,我的问题是如何更改此行为,以便如果线程空闲,如何确保为执行分配空闲线程,而不是创建新线程??。
低于产出将更清楚地表明:
Task 46 ends
Active Count: 0 Pool Size : 3 Idle Count: 3 Queue Size: 0
Task 47 ends
Active Count: 0 Pool Size : 3 Idle Count: 3 Queue Size: 0
Task 48 ends
Active Count: 0 Pool Size : 3 Idle Count: 3 Queue Size: 0
Active Count: 1 Pool Size : 4 Idle Count: 3 Queue Size: 0
Task 49 ends
Active Count: 2 Pool Size : 5 Idle Count: 3 Queue Size: 0
Task 50 ends
Active Count: 2 Pool Size : 5 Idle Count: 3 Queue Size: 0这些代码文件如下:
ThreadPoolExecutor是Java1.5版,因为我们在服务器上使用的是1.5,不能升级它。
ThreadPoolExecutor:-
public void execute(Runnable command) {
System.out.println("Active Count: " + getActiveCount()
+ " Pool Size : " + getPoolSize() + " Idle Count: "
+ (getPoolSize() - getActiveCount())+" Queue Size: "+getQueue().size());
if (command == null)
throw new NullPointerException();
for (;;) {
if (runState != RUNNING) {
reject(command);
return;
}
if (poolSize < corePoolSize && addIfUnderCorePoolSize(command))
return;
if (workQueue.offer(command))
return;
int status = addIfUnderMaximumPoolSize(command);
if (status > 0) // created new thread
return;
if (status == 0) { // failed to create thread
reject(command);
return;
}
// Retry if created a new thread but it is busy with another task
}
}LinkedBlockingQueue:-
public class CustomBlockingQueue<E> extends LinkedBlockingQueue<E>
{
/**
*
*/
private static final long serialVersionUID = 1L;
public CustomBlockingQueue() {
super(Integer.MAX_VALUE);
}
public boolean offer(E e) {
return false;
}
}在拒绝处理程序中,我们调用未覆盖的队列的put方法
Callingexecutor
final CustomThreadPoolExecutor tpe = new CustomThreadPoolExecutor(3, 8, 0L, TimeUnit.MILLISECONDS, new MediationBlockingQueue<Runnable>(), new MediationRejectionHandler());
private static final int TASK_COUNT = 100;
for (int i = 0; i < TASK_COUNT; i++) {
......
tpe.execute(new Task(i));
.....
}我们调用核心池大小为3的执行器,最大池大小为8,任务使用无限制链接阻塞队列。
发布于 2013-10-30 12:09:56
使用SynchronousQueue实现“排队前先开始,但更喜欢现有线程”行为的最简单方法。如果并且只有在已经有等待接收者的情况下,它才会接受提供的物品。因此,空闲线程将获得项,一旦没有空闲线程,ThreadPoolExecutor将启动新线程。
唯一的缺点是,一旦启动了所有线程,就不能简单地将挂起的项放入队列中,因为它没有容量。因此,要么必须接受提交者被阻塞,要么需要另一个队列将挂起的任务放到它,另一个后台线程试图将这些挂起的项放到同步队列中。这个额外的线程不会影响性能,因为它大部分时间在这两个队列中被阻塞。
class QueuingRejectionHandler implements RejectedExecutionHandler {
final ExecutorService processPending=Executors.newSingleThreadExecutor();
public void rejectedExecution(
final Runnable r, final ThreadPoolExecutor executor) {
processPending.execute(new Runnable() {
public void run() {
executor.execute(r);
}
});
}
}…
ThreadPoolExecutor e=new ThreadPoolExecutor(
corePoolSize, maximumPoolSize, keepAliveTime, unit,
new SynchronousQueue<Runnable>(), new QueuingRejectionHandler());发布于 2013-10-30 22:04:19
我相信你的问题有以下几点:
public boolean offer(E e) {
return false;
}这将始终将false返回到TPE,这将导致它启动另一个线程,而不管当前有多少线程处于空闲状态。这不是我在这个答案上的代码示例所推荐的。在反馈之后,我不得不纠正早期的一个问题。
我的回答是让您的offer(...)方法看起来像:
public boolean offer(Runnable e) {
/*
* Offer it to the queue if there is 1 or 0 items already queued, else
* return false so the TPE will add another thread.
*/
if (size() <= 1) {
return super.offer(e);
} else {
return false;
}
}因此,如果队列中已经有两个或更多的东西,它将分叉另一个线程,否则它会将任务排在队列中,这些任务应该由空闲线程来捕获。您还可以使用1值。使用0或更多的1尝试它可能适合您的应用程序。将该值注入您的CustomBlockingQueue可能是合适的。
发布于 2013-10-30 12:12:30
灰色这里给出的解决方案很棒,但我面临着与您的解决方案相同的问题,即理想的线程不是用来选择新任务的,而是创建了新线程,以防poolSize小于maxPoolSize。
因此,我试图修改ThreadPoolExecutor本身的功能,方法是复制完整的类(不是一个好主意,但找不到任何其他解决方案),并使用ThreadPoolExecutor和重写执行方法对其进行扩展。
以下是方法:
public void execute(Runnable command)
{
System.out.println("ActiveCount : " + this.getActiveCount()
+ " PoolSize : " + this.getPoolSize() + " QueueSize : "
+ this.getQueue().size());
if (command == null)
throw new NullPointerException();
for (;;)
{
if (runState != RUNNING)
{
reject(command);
return;
}
if (poolSize < corePoolSize && addIfUnderCorePoolSize(command))
return;
//Now, it will never offer to queue but will go further for thread creation.
//if (workQueue.offer(command))
//return;
//This check is introduced to utilized ideal threads instead of creating new thread
//for incoming tasks.
//Example : coreSize = 3, maxPoolSize = 8.
//activeCount = 4, and PoolSize = 5, so 1 thread is ideal Currently queue is empty.
//When new task comes, it will offer that to queue, and getTask() will take care and execute the task.
//But if new task comes, before ideal thread takes task from queue,
//activeCount = 4, and PoolSize = 5, so 1 thread is ideal Currently queue size = 1.
//this check fails and new thread is created if poolsize under max size or
//task is added to queue through rejection handler.
if ((this.getPoolSize() - this.getActiveCount()) > 0 &&
(this.getPoolSize() - this.getActiveCount() - workQueue.size()) > 0)
{
workQueue.offer(command);
return;
}
int status = addIfUnderMaximumPoolSize(command);
if (status > 0) // created new thread
return;
if (status == 0)
{ // failed to create thread
reject(command);
return;
}
// Retry if created a new thread but it is busy with another task
}
}在拒绝处理程序中,我使用put方法将任务放入队列(无界),正如Gray所建议的那样。:)
注意:我没有在代码中覆盖队列的行为。
https://stackoverflow.com/questions/19677308
复制相似问题