以下TaskStack基于它在构造时接收的函数异步处理输入元素:
TaskStack<String, Integer> stack = new TaskStack<>(String::length);在现实的情况下,这个函数将长期运行。
我们可以将add输入元素输入到堆栈中,以便对它们进行处理:
CompletableFuture<Integer> futOutput1 = stack.add("How's it going?");
CompletableFuture<Integer> futOutput2 = stack.add("duplicate input");
CompletableFuture<Integer> futOutput3 = stack.add("duplicate input");现在,我创建TaskStack而不使用番石榴缓存或咖啡因缓存的原因是,我需要先处理最近添加的输入(即,最后一个输入,先出),据我所知,番石榴和咖啡因不提供LIFO缓存。
另外,如果已经添加了一个输入,那么在稍后添加相同的输入时,我希望避免执行两次处理。这就是为什么上面示例中的futOutput2和futOutput3引用相同的CompletableFuture。
对于我来说,TaskStack的一个重要特性是能够完成remove的预定工作:
stack.remove("How's it going?");调用remove包含不同的内容,这取决于处理输入“进行得如何?”的状态:
add再次创建。java.util.concurrent.CancellationException完成处理这是TaskStack的代码:
import java.lang.ref.SoftReference;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.*;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;
/**
* A stack that accepts input elements which are processed asynchronously in a last in, first out (LIFO) order.
* <p>
* Created by Matthias Braun on 2017-12-07.
*/
public class TaskStack<T, R> {
// This function defines what it means to process input
private final Function<T, R> func;
/*
* Holds the input elements and the futures containing the output elements.
* We use soft references for the outputs to allow the garbage collector to free them if no one
* else is referencing them. This avoids memory issues when outputs occupy a lot of memory.
*/
private final Map<T, SoftReference<CompletableFuture<R>>> inputOutputMap = new ConcurrentHashMap<>();
// Provides threads to turn inputs into outputs
private final ExecutorService executor = createExecutor();
/**
* Creates a new {@link TaskStack}.
*
* @param func elements added to the {@link TaskStack} will be processed using this {@link Function}
*/
public TaskStack(Function<T, R> func) {
this.func = func;
}
/**
* Adds and processes the {@code input}.
*
* @param input we process this input, turning it into a value of type {@code R} in the future
* @return the result of processing the {@code input} wrapped in a {@link CompletableFuture}.
* If the {@code input} was already added to the {@link TaskStack} by a prior call to this method, we return
* the same {@link CompletableFuture} as before
* @see #remove
*/
public CompletableFuture<R> add(T input) {
return fold(getOpt(inputOutputMap, input),
// We don't have a precomputed result for the input -> Start processing
() -> {
// Process the input on one of the threads provided by the executor
CompletableFuture<R> futureOutput = CompletableFuture.supplyAsync(() -> func.apply(input), executor);
// Wrap the future result in a soft reference and put it in the map so the result can be
// garbage collected once no one else is referencing it
inputOutputMap.put(input, new SoftReference<>(futureOutput));
return futureOutput;
},
// There's already a result for the input, but it may have been garbage collected
existingSoftFuture -> fold(existingSoftFuture,
() -> {
// The result was already garbage collected ->
// Remove the input too and make this method reprocess the input
inputOutputMap.remove(input);
return add(input);
},
// We can return the result without reprocessing the input
future -> future
)
);
}
/**
* Removes the {@code input} from this {@link TaskStack} meaning the {@code input} won't be processed.
* If the {@code input} was already processed, subsequent calls to {@link #add} will cause the {@code input} to be
* processed again.
*
* @param input we remove this from the {@link TaskStack}
*/
public void remove(T input) {
getOpt(inputOutputMap, input).ifPresent(softFuture -> {
inputOutputMap.remove(input);
ifPresent(softFuture,
// CompletableFuture ignore the mayInterruptIfRunning flag
future -> future.cancel(false));
});
}
private static ExecutorService createExecutor() {
// How many threads should process input at maximum. Have as many threads as there are processors minus one,
// but at least one thread
int maxPoolSize = Math.max(1, Runtime.getRuntime().availableProcessors() - 1);
// When the number of threads is greater than the core, this is the maximum time that excess idle threads will
// wait for new tasks before terminating
long keepAliveTime = 3;
TimeUnit timeUnit = TimeUnit.SECONDS;
// It's the stack that makes the executor assign threads to the submitted tasks in a last in, first out order
return new ThreadPoolExecutor(0, maxPoolSize, keepAliveTime, timeUnit, new BlockingStack<>());
}
/**
* Applies the referent of {@code ref} to {@code ifPresent} if {@code ref} is not null. Otherwise, does nothing.
*
* @param ref the {@link SoftReference} whose referent we apply to {@code ifPresent} if it's not null
* @param ifPresent the {@link Consumer} to which we pass the non-null referent of {@code ref}
* @param <T> the type of {@code ref}'s referent
*/
private static <T> void ifPresent(SoftReference<T> ref, Consumer<T> ifPresent) {
T referent = ref.get();
if (referent != null) {
ifPresent.accept(referent);
}
}
/**
* Applies the referent of {@code ref} to {@code ifPresent} if {@code ref} is not null. Otherwise,
* calls {@code ifAbsent}.
*
* @param ref the {@link SoftReference} whose referent we apply to {@code ifPresent} if it's not null
* @param ifAbsent if {@code ref}'s referent is null, we call this {@link Supplier} to produce a value of type
* {@code Res}
* @param ifPresent the {@link Function} to which we pass the non-null referent of {@code ref} to produce a value
* of type {@code Res}
* @param <T> the type of {@code ref}'s referent
* @param <Res> the type of the value produced by both {@code ifAbsent} and {@code ifPresent}
* @return a value of type {@code Res}
*/
private static <T, Res> Res fold(SoftReference<T> ref, Supplier<Res> ifAbsent, Function<T, Res> ifPresent) {
T referent = ref.get();
final Res result;
if (referent == null) {
result = ifAbsent.get();
} else {
result = ifPresent.apply(referent);
}
return result;
}
/**
* Gets the value associated with {@code key} from the {@code map} or an {@link Optional#empty()} if the {@code map}
* doesn't contain the {@code key}.
*
* @param map we get the value from this {@link Map}
* @param key we get the value associated with this {@code key}
* @param <K> the {@code key}'s type
* @param <V> the value's type
* @return the value associated with {@code key} or an {@link Optional#empty()} if the {@code map}
* doesn't contain the {@code key}
*/
private static <K, V> Optional<V> getOpt(final Map<K, V> map, final K key) {
return Optional.ofNullable(map.get(key));
}
/**
* Applies the value of {@code optional} to {@code ifPresent} if it's present. Otherwise, calls {@code ifAbsent}.
*
* @param optional the {@link Optional} whose value we apply to {@code ifPresent} if it's present
* @param ifAbsent if {@code optional}'s value is absent, we call this {@link Supplier} to produce a value of type
* {@code Res}
* @param ifPresent the {@link Function} to which we pass the value of {@code optional} to produce a value
* of type {@code Res}
* @param <T> the type of {@code optional}'s value
* @param <Res> the type of the value produced by both {@code ifAbsent} and {@code ifPresent}
* @return a value of type {@code Res}
*/
private static <T, Res> Res fold(Optional<T> optional, Supplier<Res> ifAbsent, Function<T, Res> ifPresent) {
final Res result;
if (optional.isPresent()) {
result = ifPresent.apply(optional.get());
} else {
result = ifAbsent.get();
}
return result;
}
/**
* A stack that will block when it's full and clients try to add new elements to it.
* Being a stack, it adds new elements in a last in first out manner: We put the most recently added elements at the
* first position in the stack.
* <p>
* If its capacity is unspecified, it defaults to {@link Integer#MAX_VALUE}.
*
* @param <E> the elements inside the {@link BlockingStack}
*/
public static class BlockingStack<E> extends LinkedBlockingDeque<E> {
@Override
public boolean offer(E e) {
return offerFirst(e);
}
@Override
public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException {
return offerFirst(e, timeout, unit);
}
@Override
public boolean add(E e) {
return offerFirst(e);
}
@Override
public void put(E e) throws InterruptedException {
putFirst(e);
}
}
}我欢迎对代码的每一个方面的反馈,特别是关于我可能错过的并发错误的反馈。
发布于 2017-12-15 13:58:40
你有什么,根据你的描述,我会认为是一个并发错误。这是一种边缘情况,但大多数并发错误都是这样。
你说过:
另外,如果已经添加了一个输入,那么在稍后添加相同的输入时,我希望避免执行两次处理。这就是为什么上面示例中的futOutput2和futOutput3引用相同的CompletableFuture。
因此,您希望避免执行两次处理,但在所有情况下都必须避免执行两次处理吗?您正在使用ConcurrentHashMap,但尚不清楚是否会将多个线程添加到堆栈中。如果您使用的是多个线程,并且不能进行两次处理,那么您可能会遇到问题。
您没有显式地做任何事情来保护您的HashMap,而是依赖于它的内部保护,以确保它不会陷入不一致的状态。这很好,但它没有你所做的任何背景。因此,可以从不同的线程向TaskStack添加相同的项,这样task就可以运行两次。这可以用以下代码来演示:
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
public class TSReviewApp {
private class ThreadedEvaluator extends Thread {
private final String valueToAdd;
private final TaskStack<String, Integer> stack;
private CompletableFuture<Integer> result;
public ThreadedEvaluator(TaskStack<String, Integer> stack, String valueToAdd) {
this.stack = stack;
this.valueToAdd = valueToAdd;
}
public void run() {
result = stack.add(valueToAdd);
}
public Integer result() throws ExecutionException, InterruptedException {
return result.get();
}
public CompletableFuture<Integer> future() {
return result;
}
}
private static int GetLengthQuick(String str) {
return str.length();
}
private static int GetLengthSlow(String str) {
try {
System.out.println("Thinking about: " + str);
Thread.sleep(2000);
System.out.println("Evaluated: " + str);
} catch (InterruptedException ex) {
System.out.println("Sleep interrupted");
}
return str.length();
}
private void Go() {
TaskStack<String, Integer> stack = new TaskStack<>(TSReviewApp::GetLengthSlow);
ThreadedEvaluator futOutput1 = new ThreadedEvaluator(stack,"How is it going?");
ThreadedEvaluator duplicateFuture1 = new ThreadedEvaluator(stack,"duplicate input");
ThreadedEvaluator duplicateFuture2 = new ThreadedEvaluator(stack,"duplicate input");
ThreadedEvaluator duplicateFuture3 = new ThreadedEvaluator(stack,"duplicate input");
futOutput1.start();
duplicateFuture1.start();
duplicateFuture2.start();
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
System.out.println("MainMethod sleep exception");
e.printStackTrace();
}
try {
System.out.println(futOutput1.result());
// If the same future was used, this would evaluate to *true*, it doesn't.
// because two futures were created, one that is in the map still and one
// that isn't.
System.out.println("future1 == future 2? " + (duplicateFuture1.future() == duplicateFuture2.future()));
System.out.println(duplicateFuture1.result());
System.out.println(duplicateFuture2.result());
} catch (Exception ex) {
System.out.println(ex);
}
duplicateFuture3.start();
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
System.out.println("MainMethod sleep 2 exception");
e.printStackTrace();
}
try {
System.out.println(duplicateFuture3.result());
// Because one of the previous calls will still be in the map,
// One of the next two checks will return true
System.out.println("future3 == future 1? " + (duplicateFuture1.future() == duplicateFuture3.future()));
System.out.println("future3 == future 2? " + (duplicateFuture2.future() == duplicateFuture3.future()));
} catch (Exception ex) {
System.out.println(ex);
}
}
public static void main(String[] args) {
TSReviewApp app = new TSReviewApp();
app.Go();
}
}它输出的内容如下:
Thinking about: How is it going?
Evaluated: How is it going?
Thinking about: duplicate input
16
future1 == future 2? false
Evaluated: duplicate input
Thinking about: duplicate input
Evaluated: duplicate input
15
15
15
future3 == future 1? true
future3 == future 2? false应用程序处理两个字符串“进展如何?”和“重复输入”。在输出中要注意的是,当添加和执行最初的三个项时(来自不同的线程),它将导致三组处理(“重复输入”被处理两次)。如果稍后再次添加“重复输入”,则使用映射中的现有项,因此不必再次进行处理。这有多重要?
通过检查TaskStack调用的值,您可以从您的put中检测到您正处于这种情况:
public CompletableFuture<R> add(T input) {
return fold(getOpt(inputOutputMap, input),
// We don't have a precomputed result for the input -> Start processing
() -> {
// Process the input on one of the threads provided by the executor
CompletableFuture<R> futureOutput = CompletableFuture.supplyAsync(() -> func.apply(input), executor);
// Wrap the future result in a soft reference and put it in the map so the result can be
// garbage collected once no one else is referencing it
//---->
SoftReference<CompletableFuture<R>> previousValue = inputOutputMap.put(input, new SoftReference<>(futureOutput));
if(null != previousValue) {
System.out.println("Added '" + input + "' to map when it was already present");
}
//---->
return futureOutput;
},
// There's already a result for the input, but it may have been garbage collected
existingSoftFuture -> fold(existingSoftFuture,
() -> {
// The result was already garbage collected ->
// Remove the input too and make this method reprocess the input
inputOutputMap.remove(input);
return add(input);
},
// We can return the result without reprocessing the input
future -> future
)
);
}同样,对于您的使用场景来说,这可能是可以接受的(如果偶尔执行多次处理,您只从一个线程添加,重复输入的可能性很低),或者您可能需要考虑某种围绕读/写的锁定。
https://codereview.stackexchange.com/questions/182579
复制相似问题