首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >如何使用PriorityBlockingQueue和自定义任务实现ThreadPoolExecutor

如何使用PriorityBlockingQueue和自定义任务实现ThreadPoolExecutor
EN

Stack Overflow用户
提问于 2010-08-23 07:52:53
回答 6查看 22.6K关注 0票数 26

我找了很多东西,却找不到解决问题的办法。

我有自己的类BaseTask,它使用ThreadPoolExecutor来处理任务。我想要任务优先级,但是当我尝试使用PriorityBlockingQueue时,我得到了ClassCastException,因为ThreadPoolExecutor将我的任务封装到FutureTask对象中。

这显然是有意义的,因为FutureTask没有实现Comparable,但是我将如何继续解决优先级问题?我已经读到您可以在newTaskFor()中重写ThreadPoolExecutor,但是我似乎根本找不到这个方法.?

任何建议都将不胜感激!

一些需要帮助的代码:

在我的BaseTask课上

代码语言:javascript
复制
private static final BlockingQueue<Runnable> sWorkQueue = new PriorityBlockingQueue<Runnable>();

private static final ThreadFactory sThreadFactory = new ThreadFactory() {
    private final AtomicInteger mCount = new AtomicInteger(1);

    public Thread newThread(Runnable r) {
        return new Thread(r, "AsyncTask #" + mCount.getAndIncrement());
    }
};

private static final BaseThreadPoolExecutor sExecutor = new BaseThreadPoolExecutor(
    1, Integer.MAX_VALUE, 10, TimeUnit.SECONDS, sWorkQueue, sThreadFactory);

private final BaseFutureTask<Result> mFuture;

public BaseTask(int priority) {
    mFuture = new BaseFutureTask<Result>(mWorker, priority);
}

public final BaseTask<Params, Progress, Result> execute(Params... params) {

    /* Some unimportant code here */

    sExecutor.execute(mFuture);
}

BaseFutureTask类中

代码语言:javascript
复制
@Override
public int compareTo(BaseFutureTask another) {
    long diff = this.priority - another.priority;

    return Long.signum(diff);
}

BaseThreadPoolExecutor类中,我重写了3种submit方法.这个类中的构造函数被调用,但是没有一个submit方法

EN

回答 6

Stack Overflow用户

发布于 2011-03-30 11:37:20

代码语言:javascript
复制
public class ExecutorPriority {

public static void main(String[] args) {

    PriorityBlockingQueue<Runnable> pq = new PriorityBlockingQueue<Runnable>(20, new ComparePriority());

    Executor exe = new ThreadPoolExecutor(1, 2, 10, TimeUnit.SECONDS, pq);
    exe.execute(new RunWithPriority(2) {

        @Override
        public void run() {

            System.out.println(this.getPriority() + " started");
            try {
                Thread.sleep(3000);
            } catch (InterruptedException ex) {
                Logger.getLogger(ExecutorPriority.class.getName()).log(Level.SEVERE, null, ex);
            }
            System.out.println(this.getPriority() + " finished");
        }
    });
    exe.execute(new RunWithPriority(10) {

        @Override
        public void run() {
            System.out.println(this.getPriority() + " started");
            try {
                Thread.sleep(3000);
            } catch (InterruptedException ex) {
                Logger.getLogger(ExecutorPriority.class.getName()).log(Level.SEVERE, null, ex);
            }
            System.out.println(this.getPriority() + " finished");
        }
    });

}

private static class ComparePriority<T extends RunWithPriority> implements Comparator<T> {

    @Override
    public int compare(T o1, T o2) {
        return o1.getPriority().compareTo(o2.getPriority());
    }
}

}

可以猜到,RunWithPriority是一个抽象类,它是可运行的,具有整数优先级字段。

票数 14
EN

Stack Overflow用户

发布于 2013-05-16 01:44:11

您可以使用以下帮助类:

代码语言:javascript
复制
public class PriorityFuture<T> implements RunnableFuture<T> {

    private RunnableFuture<T> src;
    private int priority;

    public PriorityFuture(RunnableFuture<T> other, int priority) {
        this.src = other;
        this.priority = priority;
    }

    public int getPriority() {
        return priority;
    }

    public boolean cancel(boolean mayInterruptIfRunning) {
        return src.cancel(mayInterruptIfRunning);
    }

    public boolean isCancelled() {
        return src.isCancelled();
    }

    public boolean isDone() {
        return src.isDone();
    }

    public T get() throws InterruptedException, ExecutionException {
        return src.get();
    }

    public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
        return src.get();
    }

    public void run() {
        src.run();
    }

    public static Comparator<Runnable> COMP = new Comparator<Runnable>() {
        public int compare(Runnable o1, Runnable o2) {
            if (o1 == null && o2 == null)
                return 0;
            else if (o1 == null)
                return -1;
            else if (o2 == null)
                return 1;
            else {
                int p1 = ((PriorityFuture<?>) o1).getPriority();
                int p2 = ((PriorityFuture<?>) o2).getPriority();

                return p1 > p2 ? 1 : (p1 == p2 ? 0 : -1);
            }
        }
    };
}

代码语言:javascript
复制
public interface PriorityCallable<T> extends Callable<T> {

    int getPriority();

}

这个助手方法:

代码语言:javascript
复制
public static ThreadPoolExecutor getPriorityExecutor(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS,
            new PriorityBlockingQueue<Runnable>(10, PriorityFuture.COMP)) {

        protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
            RunnableFuture<T> newTaskFor = super.newTaskFor(callable);
            return new PriorityFuture<T>(newTaskFor, ((PriorityCallable<T>) callable).getPriority());
        }
    };
}

然后,像这样使用它:

代码语言:javascript
复制
class LenthyJob implements PriorityCallable<Long> {
    private int priority;

    public LenthyJob(int priority) {
        this.priority = priority;
    }

    public Long call() throws Exception {
        System.out.println("Executing: " + priority);
        long num = 1000000;
        for (int i = 0; i < 1000000; i++) {
            num *= Math.random() * 1000;
            num /= Math.random() * 1000;
            if (num == 0)
                num = 1000000;
        }
        return num;
    }

    public int getPriority() {
        return priority;
    }
}

public class TestPQ {

    public static void main(String[] args) throws InterruptedException, ExecutionException {
        ThreadPoolExecutor exec = getPriorityExecutor(2);

        for (int i = 0; i < 20; i++) {
            int priority = (int) (Math.random() * 100);
            System.out.println("Scheduling: " + priority);
            LenthyJob job = new LenthyJob(priority);
            exec.submit(job);
        }
    }
}
票数 11
EN

Stack Overflow用户

发布于 2016-01-24 09:55:13

我将尝试用一个功能齐全的代码来解释这个问题。但在深入研究代码之前,我想先解释一下PriorityBlockingQueue

PriorityBlockingQueue:PriorityBlockingQueue是BlockingQueue的一个实现。它接受任务及其优先级,并首先提交优先级最高的任务以供执行。如果任何两个任务具有相同的优先级,那么我们需要提供一些自定义逻辑来决定哪个任务先执行。

现在让我们直接进入代码。

驱动程序类:该类创建一个执行器,该执行器接受任务,然后提交它们以供执行。这里我们创建了两个任务,一个是低优先级的任务,另一个是高优先级的任务。在这里,我们告诉执行器运行最多1个线程并使用PriorityBlockingQueue。

代码语言:javascript
复制
     public static void main(String[] args) {

       /*
       Minimum number of threads that must be running : 0
       Maximium number of threads that can be created : 1
       If a thread is idle, then the minimum time to keep it alive : 1000
       Which queue to use : PriorityBlockingQueue
       */
    PriorityBlockingQueue queue = new PriorityBlockingQueue();
    ThreadPoolExecutor executor = new ThreadPoolExecutor(0,1,
        1000, TimeUnit.MILLISECONDS,queue);

    MyTask task = new MyTask(Priority.LOW,"Low");
    executor.execute(new MyFutureTask(task));
    task = new MyTask(Priority.HIGH,"High");
    executor.execute(new MyFutureTask(task));
}

MyTask类:MyTask实现Runnable,并在构造函数中接受优先级作为参数。当此任务运行时,它会打印一条消息,然后使线程休眠1秒。

代码语言:javascript
复制
   public class MyTask implements Runnable {

  public int getPriority() {
    return priority.getValue();
  }

  private Priority priority;

  public String getName() {
    return name;
  }

  private String name;

  public MyTask(Priority priority,String name){
    this.priority = priority;
    this.name = name;
  }

  @Override
  public void run() {
    System.out.println("The following Runnable is getting executed "+getName());
    try {
      Thread.sleep(1000);
    } catch (InterruptedException e) {
      e.printStackTrace();
    }
  }

}

MyFutureTask类:由于我们使用PriorityBlocingQueue来保存任务,我们的任务必须包装在FutureTask中,而FutureTask的实现必须实现类似的接口。可比较的接口比较两个不同任务的优先级,并提交优先级最高的任务执行。

代码语言:javascript
复制
 public class MyFutureTask extends FutureTask<MyFutureTask>
      implements Comparable<MyFutureTask> {

    private  MyTask task = null;

    public  MyFutureTask(MyTask task){
      super(task,null);
      this.task = task;
    }

    @Override
    public int compareTo(MyFutureTask another) {
      return task.getPriority() - another.task.getPriority();
    }
  }

优先级类:不言自明的优先级类。

代码语言:javascript
复制
public enum Priority {

  HIGHEST(0),
  HIGH(1),
  MEDIUM(2),
  LOW(3),
  LOWEST(4);

  int value;

  Priority(int val) {
    this.value = val;
  }

  public int getValue(){
    return value;
  }


}

现在,当我们运行这个例子时,我们得到了以下输出

代码语言:javascript
复制
The following Runnable is getting executed High
The following Runnable is getting executed Low

尽管我们先提交了低优先级的任务,但随后提交了高优先级的任务,但是由于我们使用的是PriorityBlockingQueue,所以具有较高优先级的任务将首先执行。

票数 5
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/3545623

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档