首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >改变ThreadPoolExecutor

改变ThreadPoolExecutor
EN

Stack Overflow用户
提问于 2013-10-30 08:32:40
回答 4查看 2K关注 0票数 2

如以下连结所述:

如何使ThreadPoolExecutor在排队前将线程增加到最大?

在输入元素后,我将队列实现更改为返回false。因此,每当向队列中插入新任务时,都会为其创建一个新线程。

但是,当我在记录器上大规模运行以下实现(Bis系统测试)时,会产生一个新的问题。

当任务执行时,它被插入到队列中,当队列返回false时,将创建一个新线程来执行它。当前在池中的空闲线程不会被捕获。任务的原因分配给方法中的空闲线程,后者从队列中选择任务。因此,我的问题是如何更改此行为,以便如果线程空闲,如何确保为执行分配空闲线程,而不是创建新线程??

低于产出将更清楚地表明:

代码语言:javascript
复制
Task 46 ends
Active Count: 0 Pool Size : 3 Idle Count: 3 Queue Size: 0
Task 47 ends
Active Count: 0 Pool Size : 3 Idle Count: 3 Queue Size: 0
Task 48 ends
Active Count: 0 Pool Size : 3 Idle Count: 3 Queue Size: 0
Active Count: 1 Pool Size : 4 Idle Count: 3 Queue Size: 0
Task 49 ends
Active Count: 2 Pool Size : 5 Idle Count: 3 Queue Size: 0
Task 50 ends
Active Count: 2 Pool Size : 5 Idle Count: 3 Queue Size: 0

这些代码文件如下:

ThreadPoolExecutor是Java1.5版,因为我们在服务器上使用的是1.5,不能升级它。

ThreadPoolExecutor:-

代码语言:javascript
复制
 public void execute(Runnable command) {
    System.out.println("Active Count: " + getActiveCount()
            + " Pool Size : " + getPoolSize() + " Idle Count: "
            + (getPoolSize() - getActiveCount())+" Queue Size: "+getQueue().size());
          if (command == null)
              throw new NullPointerException();
          for (;;) {
              if (runState != RUNNING) {
                  reject(command);
                  return;
              }
              if (poolSize < corePoolSize && addIfUnderCorePoolSize(command))
                  return;
              if (workQueue.offer(command))
                  return;
              int status = addIfUnderMaximumPoolSize(command);
              if (status > 0)      // created new thread
                  return;
              if (status == 0) {   // failed to create thread
                  reject(command);
                  return;
              }
              // Retry if created a new thread but it is busy with another task
          }
      }

LinkedBlockingQueue:-

代码语言:javascript
复制
public class CustomBlockingQueue<E> extends LinkedBlockingQueue<E> 
{

    /**
     * 
     */
    private static final long serialVersionUID = 1L;

    public CustomBlockingQueue() {
    super(Integer.MAX_VALUE);
    }

    public boolean offer(E e) {
       return false;
    }

}

在拒绝处理程序中,我们调用未覆盖的队列的put方法

Callingexecutor

代码语言:javascript
复制
   final CustomThreadPoolExecutor tpe = new CustomThreadPoolExecutor(3, 8, 0L, TimeUnit.MILLISECONDS, new MediationBlockingQueue<Runnable>(), new MediationRejectionHandler());

private static final int TASK_COUNT = 100;

 for (int i = 0; i < TASK_COUNT; i++) {

......
tpe.execute(new Task(i));
.....
}

我们调用核心池大小为3的执行器,最大池大小为8,任务使用无限制链接阻塞队列。

EN

回答 4

Stack Overflow用户

发布于 2013-10-30 12:09:56

使用SynchronousQueue实现“排队前先开始,但更喜欢现有线程”行为的最简单方法。如果并且只有在已经有等待接收者的情况下,它才会接受提供的物品。因此,空闲线程将获得项,一旦没有空闲线程,ThreadPoolExecutor将启动新线程。

唯一的缺点是,一旦启动了所有线程,就不能简单地将挂起的项放入队列中,因为它没有容量。因此,要么必须接受提交者被阻塞,要么需要另一个队列将挂起的任务放到它,另一个后台线程试图将这些挂起的项放到同步队列中。这个额外的线程不会影响性能,因为它大部分时间在这两个队列中被阻塞。

代码语言:javascript
复制
class QueuingRejectionHandler implements RejectedExecutionHandler {

  final ExecutorService processPending=Executors.newSingleThreadExecutor();

  public void rejectedExecution(
      final Runnable r, final ThreadPoolExecutor executor) {
    processPending.execute(new Runnable() {
      public void run() {
        executor.execute(r);
      }
    });
  }
}

代码语言:javascript
复制
ThreadPoolExecutor e=new ThreadPoolExecutor(
  corePoolSize, maximumPoolSize, keepAliveTime, unit,
  new SynchronousQueue<Runnable>(), new QueuingRejectionHandler());
票数 1
EN

Stack Overflow用户

发布于 2013-10-30 22:04:19

我相信你的问题有以下几点:

代码语言:javascript
复制
public boolean offer(E e) {
   return false;
}

这将始终将false返回到TPE,这将导致它启动另一个线程,而不管当前有多少线程处于空闲状态。这不是我在这个答案上的代码示例所推荐的。在反馈之后,我不得不纠正早期的一个问题。

我的回答是让您的offer(...)方法看起来像:

代码语言:javascript
复制
public boolean offer(Runnable e) {
    /*
     * Offer it to the queue if there is 1 or 0 items already queued, else
     * return false so the TPE will add another thread.
     */
    if (size() <= 1) {
        return super.offer(e);
    } else {
        return false;
    }
}

因此,如果队列中已经有两个或更多的东西,它将分叉另一个线程,否则它会将任务排在队列中,这些任务应该由空闲线程来捕获。您还可以使用1值。使用0或更多的1尝试它可能适合您的应用程序。将该值注入您的CustomBlockingQueue可能是合适的。

票数 1
EN

Stack Overflow用户

发布于 2013-10-30 12:12:30

灰色这里给出的解决方案很棒,但我面临着与您的解决方案相同的问题,即理想的线程不是用来选择新任务的,而是创建了新线程,以防poolSize小于maxPoolSize。

因此,我试图修改ThreadPoolExecutor本身的功能,方法是复制完整的类(不是一个好主意,但找不到任何其他解决方案),并使用ThreadPoolExecutor和重写执行方法对其进行扩展。

以下是方法:

代码语言:javascript
复制
public void execute(Runnable command)
{
 System.out.println("ActiveCount : " + this.getActiveCount()
            + " PoolSize : " + this.getPoolSize() + " QueueSize : "
            + this.getQueue().size());

if (command == null)
    throw new NullPointerException();
for (;;)
{
    if (runState != RUNNING)
    {
    reject(command);
    return;
    }
    if (poolSize < corePoolSize && addIfUnderCorePoolSize(command))
    return;
    //Now, it will never offer to queue but will go further for thread creation.
    //if (workQueue.offer(command))
    //return;


    //This check is introduced to utilized ideal threads instead of creating new thread 
    //for incoming tasks.
    //Example : coreSize = 3, maxPoolSize = 8.
    //activeCount = 4, and PoolSize = 5, so 1 thread is ideal Currently queue is empty. 
    //When new task comes, it will offer that to queue, and getTask() will take care and execute the task.
    //But if new task comes, before ideal thread takes task from queue, 
    //activeCount = 4, and PoolSize = 5, so 1 thread is ideal Currently queue size = 1.
    //this check fails and new thread is created if poolsize under max size or 
    //task is added to queue through rejection handler.

    if ((this.getPoolSize() - this.getActiveCount()) > 0 && 
        (this.getPoolSize() - this.getActiveCount() - workQueue.size()) > 0)
    {
    workQueue.offer(command);
    return;
    }
    int status = addIfUnderMaximumPoolSize(command);
    if (status > 0) // created new thread
    return;
    if (status == 0)
    { // failed to create thread
    reject(command);
    return;
    }
    // Retry if created a new thread but it is busy with another task
}
}

在拒绝处理程序中,我使用put方法将任务放入队列(无界),正如Gray所建议的那样。:)

注意:我没有在代码中覆盖队列的行为。

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/19677308

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档