我已经实现了一个ThreadPoolExecutor,它只对尚未使用的元素运行Consumer<T>。此代码使用Java 8。
这背后的背景是,我每扫描一个目录的x时间单位,为文件存在,我必须保持100%的准确性在寻找文件和其他机制,如JNotify或简单的普通WatcherService没有达到这一点。
public class SingleExecutionThreadPoolExecutor<E> extends ThreadPoolExecutor {
private final Consumer<E> consumer;
private final List<E> elementsInProcess = new ArrayList<>();
public SingleExecutionThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, Consumer<E> consumer) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
this.consumer = consumer;
}
public SingleExecutionThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, Consumer<E> consumer, RejectedExecutionHandler handler) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, handler);
this.consumer = consumer;
}
public SingleExecutionThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, Consumer<E> consumer, ThreadFactory threadFactory) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory);
this.consumer = consumer;
}
public SingleExecutionThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, Consumer<E> consumer, ThreadFactory threadFactory, RejectedExecutionHandler handler) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
this.consumer = consumer;
}
public void execute(E element) {
if (!elementsInProcess.contains(element)) {
super.execute(() -> consumer.accept(element));
elementsInProcess.add(element);
}
}
@Override
@Deprecated
public void execute(Runnable command) { }
}它使用以下类和片段进行调用:
public final class UniqueTimePath {
private final Path path;
private final FileTime fileTime;
public UniqueTimePath(final Path path) {
this.path = path;
try {
this.fileTime = Files.getLastModifiedTime(path);
} catch (IOException ex) {
throw new UncheckedIOException(ex);
}
}
public Path getPath() {
return path;
}
public FileTime getFileTime() {
return fileTime;
}
@Override
public int hashCode() {
int hash = 3;
return hash;
}
@Override
public boolean equals(Object obj) {
if (obj == null) {
return false;
}
if (getClass() != obj.getClass()) {
return false;
}
final UniqueTimePath other = (UniqueTimePath) obj;
if (!Objects.equals(this.path, other.path)) {
return false;
}
if (!Objects.equals(this.fileTime, other.fileTime)) {
return false;
}
return true;
}
@Override
public String toString() {
return "{" + path + ", " + fileTime + "}";
}
}private SingleExecutionThreadPoolExecutor<UniqueTimePath> executor;
//...
executor = new SingleExecutionThreadPoolExecutor<>(threadCount, threadCount, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(), fileConsumer);
//...
Files.list(directory)
.map(UniqueTimePath::new)
.forEach(executor::execute);发布于 2014-04-03 09:59:06
ThreadPoolExecutor的内部结构,因此在将来的发行版中,submit()方法可能不再调用已重写的空execute()方法,或者它们可以提供绕过您的execute()的新的公共execute方法。另见:有效Java,第二版,第16项:喜欢组合而不是继承consumer是否为null,并在那里抛出一个IllegalArgumentException或NullPointerException。如果它仍然是null,那么您将在execute方法的后面得到一个异常。立即抛出异常对调试有很大帮助,因为您可以从另一个线程获得包含错误客户端(而不仅仅是NullPointerException )帧的堆栈跟踪。(实用程序员:从熟练工人到大师,安德鲁亨特和大卫托马斯:死程序告诉不谎言。)elementsInProcess列表,而不是ArrayList来满足这种期望。HashMap:@重载公共int hashCode() { int散列= 3;返回散列;}考虑实现适当的hashCode。发布于 2014-04-03 12:15:42
我同意@SimonAndréForsberg和@palacsint的回答,但是我重新考虑了设计,并注意到我甚至不想在ThreadPoolExecutor中真正使用构图。
我已经将代码重构到下面的类中,其中有以下假设:
Consumer<T>已经为人所知。Executor已经被实例化了,在本例中是ThreadPoolExecutor的实例化。public class SingleExecutionExecutorBridge<T> implements Consumer<T> {
private final Consumer<T> consumer;
private final Executor executor;
private final Set<T> elementsInProcess = Collections.synchronizedSet(new HashSet<>());
public SingleExecutionExecutorBridge(final Consumer<T> consumer, final Executor executor) {
this.consumer = Objects.requireNonNull(consumer);
this.executor = Objects.requireNonNull(executor);
}
@Override
public void accept(final T element) {
if (!elementsInProcess.add(element)) {
return;
}
executor.execute(() -> consumer.accept(element));
}
}使用这种方法,所有东西都可以很好地解耦,下面的代码片段现在是相关的:
private final SingleExecutionExecutorBridge<UniqueTimePath> singleExecutionExecutorBridge;
private ThreadPoolExecutor executor;
//...
public BaseChecker(final Path directory, final Consumer<UniqueTimePath> fileConsumer, final Path configFile) {
this.directory = Objects.requireNonNull(directory);
this.configFile = Objects.requireNonNull(configFile);
this.subClass = this.getClass();
this.singleExecutionExecutorBridge = new SingleExecutionExecutorBridge<>(Objects.requireNonNull(fileConsumer), executor);
}
//...
executor = new ThreadPoolExecutor(threadCount, threadCount, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>());
//...
Files.list(directory)
.map(UniqueTimePath::new)
.forEach(singleExecutionExecutorBridge::accept);https://codereview.stackexchange.com/questions/46159
复制相似问题