首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >扩展ThreadPoolExecutor

扩展ThreadPoolExecutor
EN

Code Review用户
提问于 2014-04-03 08:03:38
回答 2查看 2.3K关注 0票数 11

我已经实现了一个ThreadPoolExecutor,它只对尚未使用的元素运行Consumer<T>。此代码使用Java 8。

这背后的背景是,我每扫描一个目录的x时间单位,为文件存在,我必须保持100%的准确性在寻找文件和其他机制,如JNotify或简单的普通WatcherService没有达到这一点。

代码语言:javascript
复制
public class SingleExecutionThreadPoolExecutor<E> extends ThreadPoolExecutor {
    private final Consumer<E> consumer;
    private final List<E> elementsInProcess = new ArrayList<>();

    public SingleExecutionThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, Consumer<E> consumer) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
        this.consumer = consumer;
    }

    public SingleExecutionThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, Consumer<E> consumer, RejectedExecutionHandler handler) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, handler);
        this.consumer = consumer;
    }

    public SingleExecutionThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, Consumer<E> consumer, ThreadFactory threadFactory) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory);
        this.consumer = consumer;
    }

    public SingleExecutionThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, Consumer<E> consumer, ThreadFactory threadFactory, RejectedExecutionHandler handler) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
        this.consumer = consumer;
    }

    public void execute(E element) {
        if (!elementsInProcess.contains(element)) {
            super.execute(() -> consumer.accept(element));
            elementsInProcess.add(element);
        }
    }

    @Override
    @Deprecated
    public void execute(Runnable command) { }
}

它使用以下类和片段进行调用:

代码语言:javascript
复制
public final class UniqueTimePath {
    private final Path path;
    private final FileTime fileTime;

    public UniqueTimePath(final Path path) {
        this.path = path;
        try {
            this.fileTime = Files.getLastModifiedTime(path);
        } catch (IOException ex) {
            throw new UncheckedIOException(ex);
        }
    }

    public Path getPath() {
        return path;
    }

    public FileTime getFileTime() {
        return fileTime;
    }

    @Override
    public int hashCode() {
        int hash = 3;
        return hash;
    }

    @Override
    public boolean equals(Object obj) {
        if (obj == null) {
            return false;
        }
        if (getClass() != obj.getClass()) {
            return false;
        }
        final UniqueTimePath other = (UniqueTimePath) obj;
        if (!Objects.equals(this.path, other.path)) {
            return false;
        }
        if (!Objects.equals(this.fileTime, other.fileTime)) {
            return false;
        }
        return true;
    }

    @Override
    public String toString() {
        return "{" + path + ", " + fileTime + "}";
    }
}
代码语言:javascript
复制
private SingleExecutionThreadPoolExecutor<UniqueTimePath> executor;
//...
executor = new SingleExecutionThreadPoolExecutor<>(threadCount, threadCount, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(), fileConsumer);
//...
Files.list(directory)
        .map(UniqueTimePath::new)
        .forEach(executor::execute);
EN

回答 2

Code Review用户

发布于 2014-04-03 09:59:06

  1. 我肯定会在这里使用组合而不是继承。它也将消除以下丑陋的(和臭味)代码:@Override @Deprecated公共void (Runnable命令){} JDK开发人员可以更改ThreadPoolExecutor的内部结构,因此在将来的发行版中,submit()方法可能不再调用已重写的空execute()方法,或者它们可以提供绕过您的execute()的新的公共execute方法。另见:有效Java,第二版,第16项:喜欢组合而不是继承
  2. 代码应该检查构造函数中的consumer是否为null,并在那里抛出一个IllegalArgumentExceptionNullPointerException。如果它仍然是null,那么您将在execute方法的后面得到一个异常。立即抛出异常对调试有很大帮助,因为您可以从另一个线程获得包含错误客户端(而不仅仅是NullPointerException )帧的堆栈跟踪。(实用程序员:从熟练工人到大师,安德鲁亨特和大卫托马斯:死程序告诉不谎言。)
  3. 大多数开发人员都希望使用从不同线程提交作业是线程安全的。,因此我会使用一个适当的同步elementsInProcess列表,而不是ArrayList来满足这种期望。
  4. 以下内容将导致如果在HashMap:@重载公共int hashCode() { int散列= 3;返回散列;}考虑实现适当的hashCode
票数 10
EN

Code Review用户

发布于 2014-04-03 12:15:42

我同意@SimonAndréForsberg和@palacsint的回答,但是我重新考虑了设计,并注意到我甚至不想在ThreadPoolExecutor中真正使用构图。

我已经将代码重构到下面的类中,其中有以下假设:

  • Consumer<T>已经为人所知。
  • Executor已经被实例化了,在本例中是ThreadPoolExecutor的实例化。
代码语言:javascript
复制
public class SingleExecutionExecutorBridge<T> implements Consumer<T> {
    private final Consumer<T> consumer;
    private final Executor executor;

    private final Set<T> elementsInProcess = Collections.synchronizedSet(new HashSet<>());

    public SingleExecutionExecutorBridge(final Consumer<T> consumer, final Executor executor) {
        this.consumer = Objects.requireNonNull(consumer);
        this.executor = Objects.requireNonNull(executor);
    }

    @Override
    public void accept(final T element) {
        if (!elementsInProcess.add(element)) {
            return;
        }
        executor.execute(() -> consumer.accept(element));
    }
}

使用这种方法,所有东西都可以很好地解耦,下面的代码片段现在是相关的:

代码语言:javascript
复制
private final SingleExecutionExecutorBridge<UniqueTimePath> singleExecutionExecutorBridge;
private ThreadPoolExecutor executor;

//...

public BaseChecker(final Path directory, final Consumer<UniqueTimePath> fileConsumer, final Path configFile) {
    this.directory = Objects.requireNonNull(directory);
    this.configFile = Objects.requireNonNull(configFile);
    this.subClass = this.getClass();
    this.singleExecutionExecutorBridge = new SingleExecutionExecutorBridge<>(Objects.requireNonNull(fileConsumer), executor);
}

//...

executor = new ThreadPoolExecutor(threadCount, threadCount, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>());

//...

Files.list(directory)
        .map(UniqueTimePath::new)
        .forEach(singleExecutionExecutorBridge::accept);
票数 4
EN
页面原文内容由Code Review提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://codereview.stackexchange.com/questions/46159

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档