首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >什么是终止我的Java ExecutorService

什么是终止我的Java ExecutorService
EN

Stack Overflow用户
提问于 2019-07-08 14:27:34
回答 2查看 1.3K关注 0票数 8

我最初在ThreadPoolExecutor的一个更复杂的子类中看到了这个问题,但是我已经简化了,所以现在只包含了一些额外的调试,并且仍然会遇到同样的问题。

代码语言:javascript
复制
import com.jthink.songkong.cmdline.SongKong;
import com.jthink.songkong.ui.MainWindow;
import com.jthink.songkong.util.SongKongThreadFactory;

import java.util.concurrent.*;
import java.util.logging.Level;



public class TimeoutThreadPoolExecutor extends ThreadPoolExecutor
{
    /**
     * Uses the default CallerRunsPolicy when queue is full
     *  @param workerSize
     * @param threadFactory
     * @param queue
     */
    public TimeoutThreadPoolExecutor(int workerSize, ThreadFactory threadFactory, LinkedBlockingQueue<Runnable> queue)
    {
        super(workerSize, workerSize, 0L, TimeUnit.MILLISECONDS, queue, threadFactory, new CallerRunsPolicy());
    }

    /**
     * Allow caller to specify the RejectedExecutionPolicy
     *  @param workerSize
     * @param threadFactory
     * @param queue
     * @param reh
     */
    public TimeoutThreadPoolExecutor(int workerSize, ThreadFactory threadFactory, LinkedBlockingQueue<Runnable> queue, RejectedExecutionHandler reh)
    {
        super(workerSize, workerSize, 0L, TimeUnit.MILLISECONDS, queue, threadFactory, reh);
    }

    @Override
    public <T> FutureCallable<T> newTaskFor(Callable<T> callable) {
        return new FutureCallable<T>(callable);
    }

    /**
     * Check not been paused
     *
     * @param t
     * @param r
     */
    @Override
    protected void beforeExecute(Thread t, Runnable r) {
        SongKong.checkIn();
    }

    /**
     * After execution
     *
     * @param r
     * @param t
     */
    @Override
    protected void afterExecute(Runnable r, Throwable t)
    {
        super.afterExecute(r, t);

        if (t == null && r instanceof Future<?>)
        {
            try
            {
              Object result = ((Future<?>) r).get();
            }
            catch (CancellationException ce)
            {
                t = ce;
            }
            catch (ExecutionException ee)
            {
                t = ee.getCause();
            }
            catch (InterruptedException ie)
            {
                Thread.currentThread().interrupt(); // ignore/reset
            }
        }
        if (t != null)
        {
            MainWindow.logger.log(Level.SEVERE, "AFTER EXECUTE---" + t.getMessage(), t);
        }
    }

    @Override
    protected void terminated()
    {
        //All tasks have completed either naturally or via being cancelled by timeout task so close the timeout task
        MainWindow.logger.severe("---Terminated:"+((SongKongThreadFactory)getThreadFactory()).getName());
        MainWindow.userInfoLogger.severe("---Terminated:"+((SongKongThreadFactory)getThreadFactory()).getName());
        StackTraceElement[] stackTrace = Thread.currentThread().getStackTrace();
        for(StackTraceElement ste:stackTrace)
        {
            MainWindow.logger.log(Level.SEVERE, ste.toString());
        }
        for(StackTraceElement ste:stackTrace)
        {
            MainWindow.userInfoLogger.log(Level.SEVERE, ste.toString());
        }
    }

    @Override
    public void shutdown()
    {
        MainWindow.logger.severe("---Shutdown:"+((SongKongThreadFactory)getThreadFactory()).getName());
        MainWindow.userInfoLogger.severe("---Shutdown:"+((SongKongThreadFactory)getThreadFactory()).getName());
        StackTraceElement[] stackTrace = Thread.currentThread().getStackTrace();
        for(StackTraceElement ste:stackTrace)
        {
            MainWindow.logger.log(Level.SEVERE, ste.toString());
        }
        for(StackTraceElement ste:stackTrace)
        {
            MainWindow.userInfoLogger.log(Level.SEVERE, ste.toString());
        }
        super.shutdown();
    }
}

下面的类使用此ExecutorService,该类允许实例异步提交任务,在所有提交的任务完成之前不应关闭ExecutorService。

代码语言:javascript
复制
package com.jthink.songkong.analyse.analyser;

import com.jthink.songkong.preferences.GeneralPreferences;
import com.jthink.songkong.ui.MainWindow;
import com.jthink.songkong.util.SongKongThreadFactory;

import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.logging.Level;

/**
 *  Sets a timeout of each task submitted and cancel them if take longer than the timeout
 *
 *  The timeout is set to 30 minutes, we only want to call if really broken, it should not happen under usual circumstances
 */
public class MainAnalyserService extends AnalyserService
{
    //For monitoring/controlling when finished
    private final AtomicInteger pendingItems = new AtomicInteger(0);
    private final CountDownLatch latch = new CountDownLatch(1);

    //If task has not completed 30 minutes after it started (added to queue) then it should be cancelled
    private static final int TIMEOUT_PER_TASK = 30;

    private static MainAnalyserService mas;

    public static MainAnalyserService getInstanceOf()
    {
        return mas;
    }

    public static MainAnalyserService create(String threadGroup)
    {
        mas = new MainAnalyserService(threadGroup);
        return mas;
    }

    public MainAnalyserService(String threadGroup)
    {
        super(threadGroup);
        initExecutorService();
    }

    /**
    Configure thread to match cpus but even if single cpu ensure have at least two threads to protect against
    scenario where there is only cpu and that thread is waiting on i/o rather than being cpu bound this would allow
    other thread to do something.
     */
    @Override
    protected void initExecutorService()
    {
        int workerSize = GeneralPreferences.getInstance().getWorkers();
        if(workerSize==0)
        {
            workerSize = Runtime.getRuntime().availableProcessors();
        }
        //Even if only have single cpu we still have multithread so we dont just have single thread waiting on I/O
        if(workerSize< MIN_NUMBER_OF_WORKER_THREADS)
        {
            workerSize = MIN_NUMBER_OF_WORKER_THREADS;
        }
        MainWindow.userInfoLogger.severe("Workers Configuration:"+ workerSize);
        MainWindow.logger.severe("Workers Configuration:"+ workerSize);

        executorService = new TimeoutThreadPoolExecutor(workerSize,
                new SongKongThreadFactory(threadGroup),
                new LinkedBlockingQueue<Runnable>(BOUNDED_QUEUE_SIZE),
                TIMEOUT_PER_TASK,
                TimeUnit.MINUTES,
                new EnsureIncreaseCountIfRunOnCallingThread());
    }

    public AtomicInteger getPendingItems()
    {
        return pendingItems;
    }

    /**
     * If queue is full this gets called and we log that we run task on local calling thread.
     */
    class EnsureIncreaseCountIfRunOnCallingThread implements RejectedExecutionHandler
    {
        /**
         * Creates a {@code CallerRunsPolicy}.
         */
        public EnsureIncreaseCountIfRunOnCallingThread() { }

        /**
         * Executes task on calling thread, ensuring we increment count
         *
         * @param r the runnable task requested to be executed
         * @param e the executor attempting to execute this task
         */
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            if (!e.isShutdown())
            {
                try
                {
                    MainWindow.userInfoLogger.severe(">>SubmittedLocally:" + ((FutureCallable) r).getCallable().getClass().getName() + ":" + pendingItems.get());
                    r.run();
                    MainWindow.userInfoLogger.severe(">>CompletedLocally:" + ((FutureCallable) r).getCallable().getClass().getName() + ":" +  pendingItems.get());
                }
                catch(Exception ex)
                {
                    MainWindow.userInfoLogger.log(Level.SEVERE, ex.getMessage(), ex);
                }
            }
        }
    }

    /**
     * Increase count and then Submit to ExecutorService
     *
     * @param callingTask
     * @param task
     */
    public void submit(Callable<Boolean> callingTask, Callable<Boolean> task) //throws Exception
    {
        //Ensure we increment before calling submit in case rejectionExecution comes into play
        int remainingItems = pendingItems.incrementAndGet();
        executorService.submit(task);
        MainWindow.userInfoLogger.severe(">>Submitted:" + task.getClass().getName() + ":" + remainingItems);
    }

    public ExecutorService getExecutorService()
    {
        return executorService;
    }

    /**
     * Must be called by Callable when it has finished work (or if error)
     *
     * @param task
     */
    public void workDone(Callable task)
    {
        int remainingItems = pendingItems.decrementAndGet();
        MainWindow.userInfoLogger.severe(">>WorkDone:" + task.getClass().getName() + ":" +remainingItems);
        if (remainingItems == 0)
        {
            MainWindow.userInfoLogger.severe(">Closing Latch:");
            latch.countDown();
        }
    }

    /**
     * Wait for latch to close, this should occur once all submitted aysync tasks have finished in some way
     *
     * @throws InterruptedException
     */
    public void awaitCompletion() throws InterruptedException{
        latch.await();
    }
}

调用类

代码语言:javascript
复制
//Just waits for all the async tasks on the list to complete/fail
analyserService.awaitCompletion();
MainWindow.userInfoLogger.severe(">MainAnalyser Completed");

对于一个客户,即使仍然有未完成的任务,且executorservice只运行了8分钟,并且没有任务超时,terminated()方法仍在被调用。我也在当地看到过这个问题。

调试显示

UserLog

代码语言:javascript
复制
05/07/2019 11.29.38:EDT:SEVERE: ----G14922:The Civil War:8907617:American Songs of Revolutionary Times and the Civil War Era:NoScore
05/07/2019 11.29.38:EDT:SEVERE: >>Submitted:com.jthink.songkong.analyse.analyser.SongSaver:69
05/07/2019 11.29.38:EDT:SEVERE: >>WorkDone:com.jthink.songkong.analyse.analyser.DiscogsSongGroupMatcher:68
05/07/2019 11.29.38:EDT:SEVERE: >MainAnalyser Finished
05/07/2019 11.29.38:EDT:INFO: Stop

DebugLog

代码语言:javascript
复制
   05/07/2019 11.29.38:EDT:TimeoutThreadPoolExecutor:terminated:SEVERE: ---Terminated:Worker

因此,我们可以看到仍然有68个任务要完成,而且MainAnalyser还没有关闭闩锁,但是线程池执行器已经终止。

为了查看是否调用了关闭(),我重写了关闭(),

runWorker()正在调用runWorker(),则runWorker()应该在循环中继续,直到队列为空(而队列不是空的),但是某些东西似乎导致它离开循环,而processWorkerExit()在做了更多的检查之后,最终终止了整个执行器(而不仅仅是一个工作线程)。

代码语言:javascript
复制
10/07/2019 07.11.51:BST:MainAnalyserService:submit:SEVERE: >>Submitted:com.jthink.songkong.analyse.analyser.DiscogsSongGroupMatcher:809
10/07/2019 07.11.51:BST:MainAnalyserService:workDone:SEVERE: >>WorkDone:com.jthink.songkong.analyse.analyser.MusicBrainzSongGroupMatcher2:808
10/07/2019 07.11.51:BST:TimeoutThreadPoolExecutor:terminated:SEVERE: ---Terminated:Worker
10/07/2019 07.11.51:BST:TimeoutThreadPoolExecutor:terminated:SEVERE: java.base/java.lang.Thread.getStackTrace(Unknown Source)
10/07/2019 07.11.51:BST:TimeoutThreadPoolExecutor:terminated:SEVERE: com.jthink.songkong.analyse.analyser.TimeoutThreadPoolExecutor.terminated(TimeoutThreadPoolExecutor.java:118)
10/07/2019 07.11.51:BST:TimeoutThreadPoolExecutor:terminated:SEVERE: java.base/java.util.concurrent.ThreadPoolExecutor.tryTerminate(Unknown Source)
10/07/2019 07.11.51:BST:TimeoutThreadPoolExecutor:terminated:SEVERE: java.base/java.util.concurrent.ThreadPoolExecutor.processWorkerExit(Unknown Source)
10/07/2019 07.11.51:BST:TimeoutThreadPoolExecutor:terminated:SEVERE: java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
10/07/2019 07.11.51:BST:TimeoutThreadPoolExecutor:terminated:SEVERE: java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
10/07/2019 07.11.51:BST:TimeoutThreadPoolExecutor:terminated:SEVERE: java.base/java.lang.Thread.run(Unknown Source)

因为ThreadPoolExecutor是Standard的一部分,所以我不能(很容易)设置断点来尝试找出它在做什么,这是ThreadPoolExecutor代码(标准Jave不是我的代码)

代码语言:javascript
复制
final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();
    Runnable task = w.firstTask;
    w.firstTask = null;
    w.unlock(); // allow interrupts
    boolean completedAbruptly = true;
    try {
        while (task != null || (task = getTask()) != null) {
            w.lock();
            // If pool is stopping, ensure thread is interrupted;
            // if not, ensure thread is not interrupted.  This
            // requires a recheck in second case to deal with
            // shutdownNow race while clearing interrupt
            if ((runStateAtLeast(ctl.get(), STOP) ||
                 (Thread.interrupted() &&
                  runStateAtLeast(ctl.get(), STOP))) &&
                !wt.isInterrupted())
                wt.interrupt();
            try {
                beforeExecute(wt, task);
                Throwable thrown = null;
                try {
                    task.run();
                } catch (RuntimeException x) {
                    thrown = x; throw x;
                } catch (Error x) {
                    thrown = x; throw x;
                } catch (Throwable x) {
                    thrown = x; throw new Error(x);
                } finally {
                    afterExecute(task, thrown);
                }
            } finally {
                task = null;
                w.completedTasks++;
                w.unlock();
            }
        }
        completedAbruptly = false;
    } finally {
        processWorkerExit(w, completedAbruptly);
    }
}

我们试验了执行器中的队列大小,默认情况下是100,因为我不希望它变得太大,因为队列任务将使用更多的内存,如果队列繁忙,我宁愿调用任务只运行自己。但是,为了解决这个问题(并删除由于队列满而需要调用CallerRunPolicy ),我将队列大小增加到1000,这导致错误发生得更快,然后完全取消限制,并继续更快地失败。

代码语言:javascript
复制
 new LinkedBlockingQueue<Runnable>(BOUNDED_QUEUE_SIZE),

我正在寻找一种替代ThreadExecutorPool的方法,偶然发现了ForkJoinPool - https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/ForkJoinPool.html

我注意到的一件事是,与外部提交表单相比,ForkJoinPool在提交给ForkJoinPool的任务中有不同的任务提交方法。我不知道这是为什么,但我想知道是否因为我从执行者运行的任务中提交任务,这会在某种程度上引起问题吗?

现在,我只需将代码复制/粘贴到新类中,重命名,并创建一个期望我的类而不是ThreadPoolExecutor运行的RejectedExcecutionhandler版本,就可以创建自己版本的ThreadPoolExecutor。

开始添加一些调试,看看我是否能破译发生了什么,有什么想法吗?

我添加了对processWorkerExit的调用

代码语言:javascript
复制
 MainWindow.userInfoLogger.severe("-----------------------"+getTaskCount()
                    +":"+getActiveCount()
                    +":"+w.completedTasks
                    +":"+ completedAbruptly);

失败了

代码语言:javascript
复制
-----------------------3686:0:593:false
EN

回答 2

Stack Overflow用户

回答已采纳

发布于 2019-07-11 07:03:09

很长一段时间以来,我一直认为问题一定出在我的代码上,然后我开始认为问题出在ThreadPoolExecutor上,但是将调试添加到我自己版本的runWorker()中就表明问题确实是我自己的代码。

代码语言:javascript
复制
 final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
        w.unlock(); // allow interrupts
        boolean completedAbruptly = true;
        try {
            while (task != null || (task = getTask()) != null) {
                MainWindow.userInfoLogger.severe("-----------------------"+workQueue.size());

从这一点我可以看出,虽然工作队列通常会变长,并且与

代码语言:javascript
复制
MainThreadAnalyzer.pendingItems -noOfWorkerThreads

在一个特定的点上,这两个值发生了分歧,这就是SongLoader过程(错误地说我并没有考虑到)结束的时候。因此,MainThreadAnalyzer继续提交增加pendingItems值的工作,但是执行器的工作队列大小越来越小。

这导致我们意识到执行器之前已经关闭了很多,但是我们没有意识到这一点,因为只有在歌加载器关闭之后才检查闩锁。

其关闭的原因是,在MainAnalyzerThread的早期,比SongLoader提交工作更快,因此pendingItems的值被临时设置为零,从而关闭锁存器。

解决方案如下

添加一个布尔标志,以指示何时完成songLoader,并且只允许在设置此标志后关闭闩锁。

代码语言:javascript
复制
private boolean songLoaderCompleted = false;
public void workDone(Callable task)
    {
        int remainingItems = pendingItems.decrementAndGet();
        MainWindow.logger.severe(">>WorkDone:" + task.getClass().getName() + ":" +remainingItems);

        if (remainingItems == 0 && songLoaderCompleted)
        {
            MainWindow.logger.severe(">Closing Latch:");
            latch.countDown();
        }
    }

然后在主线程中,在SongLoader完成后设置此标志

代码语言:javascript
复制
 //Start SongLoader
ExecutorService songLoaderService = SongLoader.getExecutorService();
songLoaderService.submit(loader);

//SongLoader uses CompletionService when calls LoadFolderWorkers so shutdown wont return until all folder
//submissions completed to the MainAnalyserService
songLoaderService.shutdown();
songLoaderService.awaitTermination(10, TimeUnit.DAYS);
MainWindow.userInfoLogger.severe(">Song Loader Finished");

//Were now allowed to consider closing the latch because we know all songs have now been loaded
//so no false chance of zeroes
analyserService.setSongLoaderCompleted();

//Just waits for all the async tasks on the list to complete/fail
analyserService.awaitCompletion();
MainWindow.userInfoLogger.severe(">MainAnalyser Completed");

//This should be immediate as there should be no tasks still remaining
analyserService.getExecutorService().shutdown();
analyserService.getExecutorService().awaitTermination(10, TimeUnit.DAYS);
票数 2
EN

Stack Overflow用户

发布于 2019-07-11 07:09:36

你只是在滥用ExecutorService

你所做的(即使在你的“解决方案”中)是

  • 提交任务
  • 等待他们完成
  • 关机
  • 再次等待关机的发生(为什么会这样?)

你应该做的是:

  • 提交任务
  • 关闭执行器以不允许任何新任务
  • 等待终止-这将阻塞直到所有任务完成,或达到超时。

您应该检查awaitTermination的返回状态,因为

  • 如果为true,则所有任务都在给定超时之前完成。
  • 如果为false --并不是所有的任务都已经完成--在这种情况下,您可能不应该启动第二个池。

此外,有两个选项,如何使用线程执行器。您可以生成辅助线程,并让它们决定它们应该做什么--就像在工作线程中循环处理新任务一样。

或者(我更喜欢的),把你的工作应该做的事情打包成单独的任务(很可能是循环体中的任务),然后作为单独的任务提交到池中。ExecutorService将为您完成日程安排。

票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/56937085

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档