首页
学习
活动
专区
圈层
工具
发布

线程池
EN

Code Review用户
提问于 2021-04-15 11:51:21
回答 2查看 118关注 0票数 3

我在工作中遇到了一个问题,那就是我们有一些任务需要尽快执行。为此,我们实现了它们,因此它们在ExecutorService中是多线程的。最初,我们对每种不同类型的任务都有不同的ExecutorService (简单Executors.newFixedThreadPool(cpuCount))。然而,这些任务只有很短的时间间隔。因此,我们不希望所有线程同时运行,而是希望它们超时。而且,不同类型的任务不太可能同时运行。

我想出的解决方案是实现我自己的AbstractExecutorService,它将工作委托给一个ThreadPools池,这样他们就可以超时,而且空闲线程的数量也最少。请给我反馈的概念,实现,编码风格,评论和任何其他你可以想到的。

代码语言:javascript
复制
package com.anon.exec;

import static java.util.stream.Collectors.*;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.AbstractExecutorService;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import com.google.common.util.concurrent.ThreadFactoryBuilder;

/**
 * This is a utility class to have a pool of thread pools.
 * It is used so that you can have the minimal number of threads running at the same time while at the same time providing multiple thread pools with equal priority
 * so that tasks scheduled together are similarly finished instead of having to wait for unrelated tasks.
 *
 * The class is designed to time out the threads in the thread pool.
 */
public class ThreadPoolPool extends AbstractExecutorService {

    private final List<ThreadPoolExecutor> internalExecutors;

    /**
     * Create a new pool of Thread Pools. You can specify the number of pools and the number of threads per pool.
     * @param poolCount the number of pools to create
     * @param poolSize the number of threads per pool
     * @param nameScheme this name is appended to the front of the thread name
     * @param threadTimeout how long until the thread times out
     * @param threadTimeoutUnit what unit the threadTimeout is in
     */
    public ThreadPoolPool(final int poolCount, final int poolSize, String nameScheme, final long threadTimeout, final TimeUnit threadTimeoutUnit) {
        if (poolCount < 1) {
            throw new IllegalArgumentException("poolCount must be at least 1");
        }
        if (poolSize < 1) {
            throw new IllegalArgumentException("poolSize must be at least 1");
        }
        if (nameScheme == null) {
            nameScheme = "ThreadPoolPool-" + hashCode();
        }
        if (threadTimeout < 1) {
            throw new IllegalArgumentException("threadTimeout must be at least 1");
        }
        Objects.requireNonNull(threadTimeoutUnit, "threadTimeoutUnit must not be null");

        this.internalExecutors = Collections.synchronizedList(new ArrayList<>(poolCount));
        for (int i = 0; i < poolCount; i++) {
            final ThreadFactory factory = new ThreadFactoryBuilder().setNameFormat(nameScheme + "-pool-" + i + "-thread-%d").build();
            ThreadPoolExecutor exec = new ThreadPoolExecutor(poolSize, poolSize, threadTimeout, threadTimeoutUnit, new LinkedBlockingQueue<>(), factory);
            exec.allowCoreThreadTimeOut(true);
            this.internalExecutors.add(exec);
        }
    }

    @Override
    public void shutdown() {
        synchronized (this.internalExecutors) {
            for (ThreadPoolExecutor pool : this.internalExecutors) {
                pool.shutdown();
            }
        }
    }

    @Override
    public List<Runnable> shutdownNow() {
        synchronized (this.internalExecutors) {
            return this.internalExecutors.stream().map(ExecutorService::shutdownNow).flatMap(List::stream).collect(toList());
        }
    }

    @Override
    public boolean isShutdown() {
        synchronized (this.internalExecutors) {
            for (ThreadPoolExecutor pool : this.internalExecutors) {
                if (!pool.isShutdown()) {
                    return false;
                }
            }
            return true;
        }
    }

    @Override
    public boolean isTerminated() {
        synchronized (this.internalExecutors) {
            for (ThreadPoolExecutor pool : this.internalExecutors) {
                if (!pool.isTerminated()) {
                    return false;
                }
            }
            return true;
        }
    }

    @Override
    public boolean awaitTermination(final long timeout, final TimeUnit unit) throws InterruptedException {
        synchronized (this.internalExecutors) {
            for (ThreadPoolExecutor pool : this.internalExecutors) {
                if (!pool.awaitTermination(timeout, unit)) {
                    return false;
                }
            }
            return true;
        }
    }

    @Override
    public void execute(final Runnable command) {
        ThreadPoolExecutor exec = getAvailableExecutor(1);
        exec.execute(command);
    }

    @Override
    public <T> T invokeAny(final Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException {
        ThreadPoolExecutor exec = getAvailableExecutor(tasks.size());
        return exec.invokeAny(tasks);
    }

    @Override
    public <T> T invokeAny(final Collection<? extends Callable<T>> tasks, final long timeout, final TimeUnit unit)
            throws InterruptedException, ExecutionException, TimeoutException {
        ThreadPoolExecutor exec = getAvailableExecutor(tasks.size());
        return exec.invokeAny(tasks, timeout, unit);
    }

    @Override
    public <T> List<Future<T>> invokeAll(final Collection<? extends Callable<T>> tasks) throws InterruptedException {
        ThreadPoolExecutor exec = getAvailableExecutor(tasks.size());
        return exec.invokeAll(tasks);
    }

    @Override
    public <T> List<Future<T>> invokeAll(final Collection<? extends Callable<T>> tasks, final long timeout, final TimeUnit unit) throws InterruptedException {
        ThreadPoolExecutor exec = getAvailableExecutor(tasks.size());
        return exec.invokeAll(tasks, timeout, unit);
    }

    /**
     * Finds an executor that can execute the given number of tasks. If there isn't one that can fit all tasks, the executor with the smallest current number of active tasks is returned.
     * @param taskCount the number of tasks you want to execute
     * @return a executor to execute your tasks
     */
    public ThreadPoolExecutor getAvailableExecutor(final int taskCount) {
        synchronized (this.internalExecutors) {
            int minActiveCount = Integer.MAX_VALUE;
            ThreadPoolExecutor minPool = null;
            for (ThreadPoolExecutor pool : this.internalExecutors) {
                int activeCount = pool.getActiveCount();
                if (activeCount < minActiveCount) {
                    minPool = pool;
                    minActiveCount = activeCount;
                }
                if (activeCount + taskCount < pool.getMaximumPoolSize()) {
                    return pool;
                }
            }
            if (minPool != null) {
                return minPool;
            }
            else {
                return this.internalExecutors.stream().findAny() // we check in the constructor that we have at least one executor
                        .orElseThrow(() -> new IllegalStateException("ThreadPoolPool has no Executors, this is an illegal state"));
            }
        }
    }

}

```
代码语言:javascript
复制
EN

回答 2

Code Review用户

发布于 2021-10-19 14:04:03

总的来说,我认为这段代码是可读的和可理解的。我没有太多的抱怨

也就是说,awaitTermination依次为每个子池等待--这意味着如果您有多个池,您可能要等待几倍于指定的超时时间。如果来电者说要等待到100, TimeUnit.MILLISECONDS,那么我们的每个池都可以让我们等待99 ms才能结束,我们仍然会声称我们在超时之前就完成了--但如果我们有200个池,那么在阻塞了将近20秒之后,我们就会这样做,尽管我们应该更早地超时。这似乎不太理想

此外,getAvailableExecutor有足够的内部感觉,我想知道它是否真的应该是public?没有其他公共方法公开任何子池,这让人感到奇怪。

票数 2
EN

Code Review用户

发布于 2021-09-19 11:06:10

从需求开始,我不清楚为什么“我们不希望所有线程同时运行,而是希望它们超时。”

如果我正确理解你的目标,你想要实现对执行者的速率限制。它可以用于优化吞吐量和减少运行在同一台机器上的噪声邻居。

API有不同的设计选择。对于execute(),您可以选择如何处理多余的任务:将它们放入队列、阻止调用代码或拒绝。这实际上取决于用例和上下文。

我建议看看番石榴的RateLimiter信号量

票数 1
EN
页面原文内容由Code Review提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://codereview.stackexchange.com/questions/259562

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档