首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >如何扩展FutureTask并确保释放对可调用的引用?

如何扩展FutureTask并确保释放对可调用的引用?
EN

Stack Overflow用户
提问于 2015-01-08 12:25:49
回答 1查看 937关注 0票数 1

我有一个自定义的ExecutorService,其中包含一个ScheduledExecutorService,可以用来中断提交给ExecutorSerice的任务,如果它们花费的时间太长,我已经将complet类放在文章的末尾。

这是正常的,只是有时中断本身会造成问题,所以我将一个易失性的布尔值、cancel标志添加到一个新的CanceableTask类中,并使它们成为子类,这样如果布尔值被发送为true,它们就可以检查和停止自己。注意,它们是提交给executor服务precisley的每个类中布尔值的一个实例,这样就可以在不取消其他任务的情况下取消长期运行的任务。

但是,FutureTask作为参数传递给beforeExecute(Thread,Runnable r),这不会授予对可调用类的访问权,因此我的超时代码无法设置cancel标志。

为此,我重写了newTaskFor方法以返回一个类,该类仅提供对可调用对象的引用。

代码语言:javascript
复制
public class FutureCallable<V> extends FutureTask<V>
{
    private Callable<V> callable;
    public FutureCallable(Callable<V> callable) {
        super(callable);
        this.callable = callable;
    }
    public Callable<V> getCallable() {
        return callable;
    }
}

一切都很顺利,或者说我是这么想的。

不幸的是,随着新任务提交到ExecutorService并最终耗尽内存,我的应用程序使用了越来越多的内存,当我分析应用程序时,我发现所有FutureCallables都有一个线程堆栈本地引用,即使在任务完成之后,而且由于FutureCallable引用了正在运行的类,所以它使用了大量内存。

当我查看FutureTask的代码(FutureCallable扩展)时,有一个针对私有可调用引用的注释:

代码语言:javascript
复制
/** The underlying callable; nulled out after running */

那么,我如何改进我的FutureCallable,使其对可调用的引用无效呢?或者为什么在任务完成后对维护的FutureCallable有一个引用。

我已经确认,如果我注释掉了newTaskFor方法,就不会有过多的内存使用,但不幸的是,我不能取消这个类。

全班是:

代码语言:javascript
复制
public class TimeoutThreadPoolExecutor extends ThreadPoolExecutor {
    private final long timeout;
    private final TimeUnit timeoutUnit;

    private final static int WAIT_BEFORE_INTERRUPT = 10000;
    private final static int WAIT_BEFORE_STOP      = 10000;


    private final ScheduledExecutorService timeoutExecutor = Executors.newSingleThreadScheduledExecutor();

    //Map Task to the Future of the Timeout Task that could be used to interrupt it
    private final ConcurrentMap<Runnable, ScheduledFuture> runningTasks = new ConcurrentHashMap<Runnable, ScheduledFuture>();

    public long getTimeout()
    {
        return timeout;
    }

    public TimeUnit getTimeoutUnit()
    {
        return timeoutUnit;
    }

    public TimeoutThreadPoolExecutor(int workerSize, ThreadFactory threadFactory, long timeout, TimeUnit timeoutUnit)
    {
        super(workerSize, workerSize, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(), threadFactory);
        MainWindow.logger.severe("Init:"+workerSize+":Timeout:"+timeout+":"+timeoutUnit);
        this.timeout = timeout;
        this.timeoutUnit = timeoutUnit;
    }

    public TimeoutThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, long timeout, TimeUnit timeoutUnit) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
        this.timeout = timeout;
        this.timeoutUnit = timeoutUnit;
    }

    @Override
    public <T> FutureCallable<T> newTaskFor(Callable<T> callable) {
        return new FutureCallable<T>(callable);
    }

    @Override
    public List<Runnable> shutdownNow() {
        timeoutExecutor.shutdownNow();
        return super.shutdownNow();
    }

    @Override
    protected void beforeExecute(Thread t, Runnable r) {
        if(timeout > 0) {
            //Schedule a task to interrupt the thread that is running the task after time timeout starting from now
            final ScheduledFuture<?> scheduled = timeoutExecutor.schedule(new TimeoutTask(t, r), timeout, timeoutUnit);

            //Add Mapping
            runningTasks.put(r, scheduled);
        }
    }

    @Override
    protected void afterExecute(Runnable r, Throwable t) {

        //AfterExecute will be called after the task has completed, either of its own accord or because it
        //took too long and was interrupted by corresponding timeout task
        //Remove mapping and cancel timeout task
        ScheduledFuture timeoutTask = runningTasks.remove(r);
        if(timeoutTask != null) {
            timeoutTask.cancel(false);
        }

    }

    @Override
    protected void terminated()
    {
        //All tasks have completed either naturally or via being cancelled by timeout task so close the timeout task
        MainWindow.logger.severe("---Shutdown TimeoutExecutor");
        timeoutExecutor.shutdown();
    }

    /**
     * Interrupt or possibly stop the thread
     *
     */
    class TimeoutTask implements Runnable {
        private final       Thread thread;
        private             Callable c;

        public TimeoutTask(Thread thread, Runnable c) {
            this.thread = thread;
            if(c instanceof FutureCallable)
            {
                this.c = ((FutureCallable) c).getCallable();
            }
        }

        @Override
        public void run()
        {
            String msg = "";
            if (c != null)
            {
                if (c != null && c instanceof CancelableTask)
                {
                    MainWindow.logger.severe("+++Cancelling " + msg + " task because taking too long");
                    ((CancelableTask) c).setCancelTask(true);
                }
            }
        }
    }
}

    public abstract class CancelableTask  extends ExecutorServiceEnabledAnalyser
    {
        private volatile boolean cancelTask = false;

        public boolean isCancelTask() {
            return cancelTask;
        }

        public void setCancelTask(boolean cancelTask) {
            this.cancelTask = cancelTask;
        }

        CancelableTask(final MainWindow start, boolean isSelectedRecords, boolean isUseRowSelection)
        {
            super(start, isSelectedRecords, isUseRowSelection);
        }

        CancelableTask(final MainWindow start, List<MetadataChangedWrapper> songs)
        {
            super(start, songs );
        }

    }
EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2015-01-08 12:42:26

这个ThreadLocal在哪里?我觉得很奇怪,也很难相信你在说什么,它能保持对所有任务的引用,即使是在完成之后。如果是这样的话,即使没有覆盖,它最终也会耗尽内存(任务本身使用一些内存,尽管可能比可调用的内存少,但仍然不是零)。

无论如何,您可以在您的done上重写FutureCallable方法,以便在执行后将包装的对象空出。

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/27840120

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档