首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >等待FutureTask上的cancel()

等待FutureTask上的cancel()
EN

Stack Overflow用户
提问于 2011-05-18 14:56:10
回答 5查看 4.1K关注 0票数 14

我想取消从ThreadPoolExecutor获取的FutureTask,但我想确保线程池上的可调用对象已经停止了它的工作。

如果我调用FutureTask#cancel(false),然后调用get() (阻塞直到完成),我会得到一个CancelledException。这个异常是立即抛出的还是在任务停止执行后抛出的?

EN

回答 5

Stack Overflow用户

回答已采纳

发布于 2015-05-07 00:12:02

这个答案修复了Aleksey和FooJBar的代码中的竞争条件,方法是检查任务是否在callable中被取消。(在FutureTask.run检查状态和运行callable之间有一个窗口,在此期间cancel和getWithJoin都可以成功完成。但是,callable仍将运行。)

我还决定不覆盖原来的cancel,因为新的cancel需要声明InterruptedException。新的cancel去掉了它无用的返回值(因为true可以表示“任务尚未开始”、“任务已经开始并且已经完成了大部分损坏”、“任务已经开始并最终将完成”)中的任何一个。对super.cancel返回值的检查也被取消了,因此如果从不同的线程多次调用新的cancel,它们都将等待任务完成。

代码语言:javascript
复制
import java.util.concurrent.Callable;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.FutureTask;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

/**
 * Based on: http://stackoverflow.com/questions/6040962/wait-for-cancel-on-futuretask
 * 
 * @author Aleksandr Dubinsky
 */
public class FixedFutureTask<T> extends FutureTask<T> {

     /**
      * Creates a {@code FutureTask} that will, upon running, execute the given {@code Runnable}, 
      * and arrange that {@code get} will return the given result on successful completion.
      *
      * @param runnable the runnable task
      * @param result the result to return on successful completion. 
      *               If you don't need a particular result, consider using constructions of the form:
      *               {@code Future<?> f = new FutureTask<Void>(runnable, null)}
      * @throws NullPointerException if the runnable is null
      */
      public 
    FixedFutureTask (Runnable runnable, T result) {
            this (Executors.callable (runnable, result));
        }

     /**
      * Creates a {@code FutureTask} that will, upon running, execute the given {@code Callable}.
      *
      * @param  callable the callable task
      * @throws NullPointerException if the callable is null
      */
      public 
    FixedFutureTask (Callable<T> callable) {
            this (new MyCallable (callable));
        }

      /** Some ugly code to work around the compiler's limitations on constructors */
      private 
    FixedFutureTask (MyCallable<T> myCallable) {
            super (myCallable);
            myCallable.task = this;
        }

    private final Semaphore semaphore = new Semaphore(1);

    private static class MyCallable<T> implements Callable<T>
    {
        MyCallable (Callable<T> callable) {
                this.callable = callable;
            }

        final Callable<T> callable;
        FixedFutureTask<T> task;

          @Override public T
        call() throws Exception {

                task.semaphore.acquire();
                try 
                {
                    if (task.isCancelled())
                        return null;

                    return callable.call();
                }
                finally 
                {
                    task.semaphore.release();
                }
            }
    }

     /**
      * Waits if necessary for the computation to complete or finish cancelling, and then retrieves its result, if available.
      *
      * @return the computed result
      * @throws CancellationException if the computation was cancelled
      * @throws ExecutionException if the computation threw an exception
      * @throws InterruptedException if the current thread was interrupted while waiting
      */
      @Override public T 
    get() throws InterruptedException, ExecutionException, CancellationException {

            try 
            {
                return super.get();
            }
            catch (CancellationException e) 
            {
                semaphore.acquire();
                semaphore.release();
                throw e;
            }
        }

     /**
      * Waits if necessary for at most the given time for the computation to complete or finish cancelling, and then retrieves its result, if available.
      *
      * @param timeout the maximum time to wait
      * @param unit the time unit of the timeout argument
      * @return the computed result
      * @throws CancellationException if the computation was cancelled
      * @throws ExecutionException if the computation threw an exception
      * @throws InterruptedException if the current thread was interrupted while waiting
      * @throws CancellationException
      * @throws TimeoutException if the wait timed out
      */
      @Override public T
    get (long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, CancellationException, TimeoutException {

            try 
            {
                return super.get (timeout, unit);
            }
            catch (CancellationException e) 
            {
                semaphore.acquire();
                semaphore.release();
                throw e;
            }
        }

     /**
      * Attempts to cancel execution of this task and waits for the task to complete if it has been started.
      * If the task has not started when {@code cancelWithJoin} is called, this task should never run.
      * If the task has already started, then the {@code mayInterruptIfRunning} parameter determines
      * whether the thread executing this task should be interrupted in an attempt to stop the task.
      *
      * <p>After this method returns, subsequent calls to {@link #isDone} will
      * always return {@code true}.  Subsequent calls to {@link #isCancelled}
      * will always return {@code true} if this method returned {@code true}.
      *
      * @param mayInterruptIfRunning {@code true} if the thread executing this task should be interrupted; 
      *                              otherwise, in-progress tasks are allowed to complete
      * @throws InterruptedException if the thread is interrupted
      */
      public void
    cancelAndWait (boolean mayInterruptIfRunning) throws InterruptedException {

            super.cancel (mayInterruptIfRunning);

            semaphore.acquire();
            semaphore.release();
        }
}
票数 1
EN

Stack Overflow用户

发布于 2013-04-08 14:20:14

是的,CancellationException是立即抛出的。您可以扩展FutureTask以添加get()方法的版本,该版本将等待Callable的线程完成。

代码语言:javascript
复制
public class ThreadWaitingFutureTask<T> extends FutureTask<T> {

    private final Semaphore semaphore;

    public ThreadWaitingFutureTask(Callable<T> callable) {
        this(callable, new Semaphore(1));
    }

    public T getWithJoin() throws InterruptedException, ExecutionException {
        try {
            return super.get();
        }
        catch (CancellationException e) {
            semaphore.acquire();
            semaphore.release();
            throw e;
        }
    }

    private ThreadWaitingFutureTask(final Callable<T> callable, 
                final Semaphore semaphore) {
        super(new Callable<T>() {
            public T call() throws Exception {
                semaphore.acquire();
                try {
                    return callable.call();
                }
                finally {
                    semaphore.release();
                }
            }
        });
        this.semaphore = semaphore;
    }
}
票数 2
EN

Stack Overflow用户

发布于 2013-11-20 20:00:37

Aleksey的例子运行良好。我编写了一个带有构造函数的变体,该构造函数接受一个Runnable (将返回null),并显示如何在cancel()上直接阻塞(join):

代码语言:javascript
复制
public class FutureTaskCancelWaits<T> extends FutureTask<T> {

    private final Semaphore semaphore;

    public FutureTaskCancelWaits(Runnable runnable) {
        this(Executors.callable(runnable, (T) null));
    }

    public FutureTaskCancelWaits(Callable<T> callable) {
        this(callable, new Semaphore(1));
    }

    @Override
    public boolean cancel(boolean mayInterruptIfRunning) {
        // If the task was successfully cancelled, block here until call() returns
        if (super.cancel(mayInterruptIfRunning)) {
            try {
                semaphore.acquire();
                // All is well
                return true;
            } catch (InterruptedException e) {
                // Interrupted while waiting...
            } finally {
                semaphore.release();
            }
        }
        return false;
    }

    private FutureTaskCancelWaits(final Callable<T> callable, final Semaphore semaphore) {
        super(new Callable<T>() {
            public T call() throws Exception {
                semaphore.acquire();
                try {
                    return callable.call();
                } finally {
                    semaphore.release();
                }
            }
        });
        this.semaphore = semaphore;
    }
}
票数 2
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/6040962

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档