我找了很多东西,却找不到解决问题的办法。
我有自己的类BaseTask,它使用ThreadPoolExecutor来处理任务。我想要任务优先级,但是当我尝试使用PriorityBlockingQueue时,我得到了ClassCastException,因为ThreadPoolExecutor将我的任务封装到FutureTask对象中。
这显然是有意义的,因为FutureTask没有实现Comparable,但是我将如何继续解决优先级问题?我已经读到您可以在newTaskFor()中重写ThreadPoolExecutor,但是我似乎根本找不到这个方法.?
任何建议都将不胜感激!
一些需要帮助的代码:
在我的BaseTask课上
private static final BlockingQueue<Runnable> sWorkQueue = new PriorityBlockingQueue<Runnable>();
private static final ThreadFactory sThreadFactory = new ThreadFactory() {
private final AtomicInteger mCount = new AtomicInteger(1);
public Thread newThread(Runnable r) {
return new Thread(r, "AsyncTask #" + mCount.getAndIncrement());
}
};
private static final BaseThreadPoolExecutor sExecutor = new BaseThreadPoolExecutor(
1, Integer.MAX_VALUE, 10, TimeUnit.SECONDS, sWorkQueue, sThreadFactory);
private final BaseFutureTask<Result> mFuture;
public BaseTask(int priority) {
mFuture = new BaseFutureTask<Result>(mWorker, priority);
}
public final BaseTask<Params, Progress, Result> execute(Params... params) {
/* Some unimportant code here */
sExecutor.execute(mFuture);
}在BaseFutureTask类中
@Override
public int compareTo(BaseFutureTask another) {
long diff = this.priority - another.priority;
return Long.signum(diff);
}在BaseThreadPoolExecutor类中,我重写了3种submit方法.这个类中的构造函数被调用,但是没有一个submit方法
发布于 2011-03-30 11:37:20
public class ExecutorPriority {
public static void main(String[] args) {
PriorityBlockingQueue<Runnable> pq = new PriorityBlockingQueue<Runnable>(20, new ComparePriority());
Executor exe = new ThreadPoolExecutor(1, 2, 10, TimeUnit.SECONDS, pq);
exe.execute(new RunWithPriority(2) {
@Override
public void run() {
System.out.println(this.getPriority() + " started");
try {
Thread.sleep(3000);
} catch (InterruptedException ex) {
Logger.getLogger(ExecutorPriority.class.getName()).log(Level.SEVERE, null, ex);
}
System.out.println(this.getPriority() + " finished");
}
});
exe.execute(new RunWithPriority(10) {
@Override
public void run() {
System.out.println(this.getPriority() + " started");
try {
Thread.sleep(3000);
} catch (InterruptedException ex) {
Logger.getLogger(ExecutorPriority.class.getName()).log(Level.SEVERE, null, ex);
}
System.out.println(this.getPriority() + " finished");
}
});
}
private static class ComparePriority<T extends RunWithPriority> implements Comparator<T> {
@Override
public int compare(T o1, T o2) {
return o1.getPriority().compareTo(o2.getPriority());
}
}}
可以猜到,RunWithPriority是一个抽象类,它是可运行的,具有整数优先级字段。
发布于 2013-05-16 01:44:11
您可以使用以下帮助类:
public class PriorityFuture<T> implements RunnableFuture<T> {
private RunnableFuture<T> src;
private int priority;
public PriorityFuture(RunnableFuture<T> other, int priority) {
this.src = other;
this.priority = priority;
}
public int getPriority() {
return priority;
}
public boolean cancel(boolean mayInterruptIfRunning) {
return src.cancel(mayInterruptIfRunning);
}
public boolean isCancelled() {
return src.isCancelled();
}
public boolean isDone() {
return src.isDone();
}
public T get() throws InterruptedException, ExecutionException {
return src.get();
}
public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
return src.get();
}
public void run() {
src.run();
}
public static Comparator<Runnable> COMP = new Comparator<Runnable>() {
public int compare(Runnable o1, Runnable o2) {
if (o1 == null && o2 == null)
return 0;
else if (o1 == null)
return -1;
else if (o2 == null)
return 1;
else {
int p1 = ((PriorityFuture<?>) o1).getPriority();
int p2 = ((PriorityFuture<?>) o2).getPriority();
return p1 > p2 ? 1 : (p1 == p2 ? 0 : -1);
}
}
};
}和
public interface PriorityCallable<T> extends Callable<T> {
int getPriority();
}和这个助手方法:
public static ThreadPoolExecutor getPriorityExecutor(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS,
new PriorityBlockingQueue<Runnable>(10, PriorityFuture.COMP)) {
protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
RunnableFuture<T> newTaskFor = super.newTaskFor(callable);
return new PriorityFuture<T>(newTaskFor, ((PriorityCallable<T>) callable).getPriority());
}
};
}然后,和像这样使用它:
class LenthyJob implements PriorityCallable<Long> {
private int priority;
public LenthyJob(int priority) {
this.priority = priority;
}
public Long call() throws Exception {
System.out.println("Executing: " + priority);
long num = 1000000;
for (int i = 0; i < 1000000; i++) {
num *= Math.random() * 1000;
num /= Math.random() * 1000;
if (num == 0)
num = 1000000;
}
return num;
}
public int getPriority() {
return priority;
}
}
public class TestPQ {
public static void main(String[] args) throws InterruptedException, ExecutionException {
ThreadPoolExecutor exec = getPriorityExecutor(2);
for (int i = 0; i < 20; i++) {
int priority = (int) (Math.random() * 100);
System.out.println("Scheduling: " + priority);
LenthyJob job = new LenthyJob(priority);
exec.submit(job);
}
}
}发布于 2016-01-24 09:55:13
我将尝试用一个功能齐全的代码来解释这个问题。但在深入研究代码之前,我想先解释一下PriorityBlockingQueue
PriorityBlockingQueue:PriorityBlockingQueue是BlockingQueue的一个实现。它接受任务及其优先级,并首先提交优先级最高的任务以供执行。如果任何两个任务具有相同的优先级,那么我们需要提供一些自定义逻辑来决定哪个任务先执行。
现在让我们直接进入代码。
驱动程序类:该类创建一个执行器,该执行器接受任务,然后提交它们以供执行。这里我们创建了两个任务,一个是低优先级的任务,另一个是高优先级的任务。在这里,我们告诉执行器运行最多1个线程并使用PriorityBlockingQueue。
public static void main(String[] args) {
/*
Minimum number of threads that must be running : 0
Maximium number of threads that can be created : 1
If a thread is idle, then the minimum time to keep it alive : 1000
Which queue to use : PriorityBlockingQueue
*/
PriorityBlockingQueue queue = new PriorityBlockingQueue();
ThreadPoolExecutor executor = new ThreadPoolExecutor(0,1,
1000, TimeUnit.MILLISECONDS,queue);
MyTask task = new MyTask(Priority.LOW,"Low");
executor.execute(new MyFutureTask(task));
task = new MyTask(Priority.HIGH,"High");
executor.execute(new MyFutureTask(task));
}MyTask类:MyTask实现Runnable,并在构造函数中接受优先级作为参数。当此任务运行时,它会打印一条消息,然后使线程休眠1秒。
public class MyTask implements Runnable {
public int getPriority() {
return priority.getValue();
}
private Priority priority;
public String getName() {
return name;
}
private String name;
public MyTask(Priority priority,String name){
this.priority = priority;
this.name = name;
}
@Override
public void run() {
System.out.println("The following Runnable is getting executed "+getName());
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}MyFutureTask类:由于我们使用PriorityBlocingQueue来保存任务,我们的任务必须包装在FutureTask中,而FutureTask的实现必须实现类似的接口。可比较的接口比较两个不同任务的优先级,并提交优先级最高的任务执行。
public class MyFutureTask extends FutureTask<MyFutureTask>
implements Comparable<MyFutureTask> {
private MyTask task = null;
public MyFutureTask(MyTask task){
super(task,null);
this.task = task;
}
@Override
public int compareTo(MyFutureTask another) {
return task.getPriority() - another.task.getPriority();
}
}优先级类:不言自明的优先级类。
public enum Priority {
HIGHEST(0),
HIGH(1),
MEDIUM(2),
LOW(3),
LOWEST(4);
int value;
Priority(int val) {
this.value = val;
}
public int getValue(){
return value;
}
}现在,当我们运行这个例子时,我们得到了以下输出
The following Runnable is getting executed High
The following Runnable is getting executed Low尽管我们先提交了低优先级的任务,但随后提交了高优先级的任务,但是由于我们使用的是PriorityBlockingQueue,所以具有较高优先级的任务将首先执行。
https://stackoverflow.com/questions/3545623
复制相似问题