首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >异步处理堆栈

异步处理堆栈
EN

Code Review用户
提问于 2017-12-12 09:55:00
回答 1查看 229关注 0票数 2

以下TaskStack基于它在构造时接收的函数异步处理输入元素:

代码语言:javascript
复制
TaskStack<String, Integer> stack = new TaskStack<>(String::length);

在现实的情况下,这个函数将长期运行。

我们可以将add输入元素输入到堆栈中,以便对它们进行处理:

代码语言:javascript
复制
CompletableFuture<Integer> futOutput1 = stack.add("How's it going?");
CompletableFuture<Integer> futOutput2 = stack.add("duplicate input");
CompletableFuture<Integer> futOutput3 = stack.add("duplicate input");

现在,我创建TaskStack而不使用番石榴缓存咖啡因缓存的原因是,我需要先处理最近添加的输入(即,最后一个输入,先出),据我所知,番石榴和咖啡因不提供LIFO缓存。

另外,如果已经添加了一个输入,那么在稍后添加相同的输入时,我希望避免执行两次处理。这就是为什么上面示例中的futOutput2futOutput3引用相同的CompletableFuture

对于我来说,TaskStack的一个重要特性是能够完成remove的预定工作:

代码语言:javascript
复制
stack.remove("How's it going?");

调用remove包含不同的内容,这取决于处理输入“进行得如何?”的状态:

  • 如果尚未开始处理,则输入将不会被处理。
  • 如果它已经完成了处理,并且我们有一个结果,则该结果将被抛出,并且必须通过调用add再次创建。
  • 如果当前正在处理输入,则不会中断处理,但将来将使用java.util.concurrent.CancellationException完成处理

这是TaskStack的代码:

代码语言:javascript
复制
import java.lang.ref.SoftReference;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.*;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;

/**
 * A stack that accepts input elements which are processed asynchronously in a last in, first out (LIFO) order.
 * <p>
 * Created by Matthias Braun on 2017-12-07.
 */
public class TaskStack<T, R> {

    // This function defines what it means to process input
    private final Function<T, R> func;

    /*
     * Holds the input elements and the futures containing the output elements.
     * We use soft references for the outputs to allow the garbage collector to free them if no one
     * else is referencing them. This avoids memory issues when outputs occupy a lot of memory.
     */
    private final Map<T, SoftReference<CompletableFuture<R>>> inputOutputMap = new ConcurrentHashMap<>();

    // Provides threads to turn inputs into outputs
    private final ExecutorService executor = createExecutor();

    /**
     * Creates a new {@link TaskStack}.
     *
     * @param func elements added to the {@link TaskStack} will be processed using this {@link Function}
     */
    public TaskStack(Function<T, R> func) {
        this.func = func;
    }

    /**
     * Adds and processes the {@code input}.
     *
     * @param input we process this input, turning it into a value of type {@code R} in the future
     * @return the result of processing the {@code input} wrapped in a {@link CompletableFuture}.
     * If the {@code input} was already added to the {@link TaskStack} by a prior call to this method, we return
     * the same {@link CompletableFuture} as before
     * @see #remove
     */
    public CompletableFuture<R> add(T input) {

        return fold(getOpt(inputOutputMap, input),
                // We don't have a precomputed result for the input -> Start processing
                () -> {
                    // Process the input on one of the threads provided by the executor
                    CompletableFuture<R> futureOutput = CompletableFuture.supplyAsync(() -> func.apply(input), executor);

                    // Wrap the future result in a soft reference and put it in the map so the result can be
                    // garbage collected once no one else is referencing it
                    inputOutputMap.put(input, new SoftReference<>(futureOutput));

                    return futureOutput;
                },
                // There's already a result for the input, but it may have been garbage collected
                existingSoftFuture -> fold(existingSoftFuture,
                        () -> {
                            // The result was already garbage collected ->
                            // Remove the input too and make this method reprocess the input
                            inputOutputMap.remove(input);
                            return add(input);
                        },
                        // We can return the result without reprocessing the input
                        future -> future
                )
        );
    }

    /**
     * Removes the {@code input} from this {@link TaskStack} meaning the {@code input} won't be processed.
     * If the {@code input} was already processed, subsequent calls to {@link #add} will cause the {@code input} to be
     * processed again.
     *
     * @param input                 we remove this from the {@link TaskStack}
     */
    public void remove(T input) {
        getOpt(inputOutputMap, input).ifPresent(softFuture -> {
            inputOutputMap.remove(input);

            ifPresent(softFuture,
                    // CompletableFuture ignore the mayInterruptIfRunning flag
                    future -> future.cancel(false));
        });
    }

    private static ExecutorService createExecutor() {

        // How many threads should process input at maximum. Have as many threads as there are processors minus one,
        // but at least one thread
        int maxPoolSize = Math.max(1, Runtime.getRuntime().availableProcessors() - 1);

        // When the number of threads is greater than the core, this is the maximum time that excess idle threads will
        // wait for new tasks before terminating
        long keepAliveTime = 3;
        TimeUnit timeUnit = TimeUnit.SECONDS;

        // It's the stack that makes the executor assign threads to the submitted tasks in a last in, first out order
        return new ThreadPoolExecutor(0, maxPoolSize, keepAliveTime, timeUnit, new BlockingStack<>());
    }

    /**
     * Applies the referent of {@code ref} to {@code ifPresent} if {@code ref} is not null. Otherwise, does nothing.
     *
     * @param ref       the {@link SoftReference} whose referent we apply to {@code ifPresent} if it's not null
     * @param ifPresent the {@link Consumer} to which we pass the non-null referent of {@code ref}
     * @param <T>       the type of {@code ref}'s referent
     */
    private static <T> void ifPresent(SoftReference<T> ref, Consumer<T> ifPresent) {
        T referent = ref.get();
        if (referent != null) {
            ifPresent.accept(referent);
        }
    }

    /**
     * Applies the referent of {@code ref} to {@code ifPresent} if {@code ref} is not null. Otherwise,
     * calls {@code ifAbsent}.
     *
     * @param ref       the {@link SoftReference} whose referent we apply to {@code ifPresent} if it's not null
     * @param ifAbsent  if {@code ref}'s referent is null, we call this {@link Supplier} to produce a value of type
     *                  {@code Res}
     * @param ifPresent the {@link Function} to which we pass the non-null referent of {@code ref} to produce a value
     *                  of type {@code Res}
     * @param <T>       the type of {@code ref}'s referent
     * @param <Res>     the type of the value produced by both {@code ifAbsent} and {@code ifPresent}
     * @return a value of type {@code Res}
     */
    private static <T, Res> Res fold(SoftReference<T> ref, Supplier<Res> ifAbsent, Function<T, Res> ifPresent) {

        T referent = ref.get();
        final Res result;
        if (referent == null) {
            result = ifAbsent.get();
        } else {
            result = ifPresent.apply(referent);
        }
        return result;
    }

    /**
     * Gets the value associated with {@code key} from the {@code map} or an {@link Optional#empty()} if the {@code map}
     * doesn't contain the {@code key}.
     *
     * @param map we get the value from this {@link Map}
     * @param key we get the value associated with this {@code key}
     * @param <K> the {@code key}'s type
     * @param <V> the value's type
     * @return the value associated with {@code key} or an {@link Optional#empty()} if the {@code map}
     * doesn't contain the {@code key}
     */
    private static <K, V> Optional<V> getOpt(final Map<K, V> map, final K key) {
        return Optional.ofNullable(map.get(key));
    }

    /**
     * Applies the value of {@code optional} to {@code ifPresent} if it's present. Otherwise, calls {@code ifAbsent}.
     *
     * @param optional  the {@link Optional} whose value we apply to {@code ifPresent} if it's present
     * @param ifAbsent  if {@code optional}'s value is absent, we call this {@link Supplier} to produce a value of type
     *                  {@code Res}
     * @param ifPresent the {@link Function} to which we pass the value of {@code optional} to produce a value
     *                  of type {@code Res}
     * @param <T>       the type of {@code optional}'s value
     * @param <Res>     the type of the value produced by both {@code ifAbsent} and {@code ifPresent}
     * @return a value of type {@code Res}
     */
    private static <T, Res> Res fold(Optional<T> optional, Supplier<Res> ifAbsent, Function<T, Res> ifPresent) {
        final Res result;
        if (optional.isPresent()) {
            result = ifPresent.apply(optional.get());
        } else {
            result = ifAbsent.get();
        }
        return result;
    }

    /**
     * A stack that will block when it's full and clients try to add new elements to it.
     * Being a stack, it adds new elements in a last in first out manner: We put the most recently added elements at the
     * first position in the stack.
     * <p>
     * If its capacity is unspecified, it defaults to {@link Integer#MAX_VALUE}.
     *
     * @param <E> the elements inside the {@link BlockingStack}
     */
    public static class BlockingStack<E> extends LinkedBlockingDeque<E> {

        @Override
        public boolean offer(E e) {
            return offerFirst(e);
        }

        @Override
        public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException {
            return offerFirst(e, timeout, unit);
        }

        @Override
        public boolean add(E e) {
            return offerFirst(e);
        }

        @Override
        public void put(E e) throws InterruptedException {
            putFirst(e);
        }
    }
}

我欢迎对代码的每一个方面的反馈,特别是关于我可能错过的并发错误的反馈。

EN

回答 1

Code Review用户

回答已采纳

发布于 2017-12-15 13:58:40

你有什么,根据你的描述,我会认为是一个并发错误。这是一种边缘情况,但大多数并发错误都是这样。

你说过:

另外,如果已经添加了一个输入,那么在稍后添加相同的输入时,我希望避免执行两次处理。这就是为什么上面示例中的futOutput2和futOutput3引用相同的CompletableFuture。

因此,您希望避免执行两次处理,但在所有情况下都必须避免执行两次处理吗?您正在使用ConcurrentHashMap,但尚不清楚是否会将多个线程添加到堆栈中。如果您使用的是多个线程,并且不能进行两次处理,那么您可能会遇到问题。

您没有显式地做任何事情来保护您的HashMap,而是依赖于它的内部保护,以确保它不会陷入不一致的状态。这很好,但它没有你所做的任何背景。因此,可以从不同的线程向TaskStack添加相同的项,这样task就可以运行两次。这可以用以下代码来演示:

代码语言:javascript
复制
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

public class TSReviewApp {
    private class ThreadedEvaluator extends Thread {
        private final String valueToAdd;
        private final TaskStack<String, Integer> stack;
        private CompletableFuture<Integer> result;

        public ThreadedEvaluator(TaskStack<String, Integer> stack, String valueToAdd) {
            this.stack = stack;
            this.valueToAdd = valueToAdd;
        }

        public void run() {
            result = stack.add(valueToAdd);
        }

        public Integer result() throws ExecutionException, InterruptedException {
            return result.get();
        }

        public CompletableFuture<Integer> future() {
            return result;
        }
    }

    private static int GetLengthQuick(String str) {
        return str.length();
    }
    private static int GetLengthSlow(String str) {
        try {
            System.out.println("Thinking about: " + str);
            Thread.sleep(2000);
            System.out.println("Evaluated: " + str);
        } catch (InterruptedException ex) {
            System.out.println("Sleep interrupted");
        }
        return str.length();
    }

    private void Go() {
        TaskStack<String, Integer> stack = new TaskStack<>(TSReviewApp::GetLengthSlow);

        ThreadedEvaluator futOutput1 = new ThreadedEvaluator(stack,"How is it going?");
        ThreadedEvaluator duplicateFuture1 = new ThreadedEvaluator(stack,"duplicate input");
        ThreadedEvaluator duplicateFuture2 = new ThreadedEvaluator(stack,"duplicate input");
        ThreadedEvaluator duplicateFuture3 = new ThreadedEvaluator(stack,"duplicate input");

        futOutput1.start();
        duplicateFuture1.start();
        duplicateFuture2.start();

        try {
            Thread.sleep(3000);
        } catch (InterruptedException e) {
            System.out.println("MainMethod sleep exception");
            e.printStackTrace();
        }

        try {
            System.out.println(futOutput1.result());
            // If the same future was used, this would evaluate to *true*, it doesn't.
            // because two futures were created, one that is in the map still and one
            // that isn't.
            System.out.println("future1 == future 2? " + (duplicateFuture1.future() == duplicateFuture2.future()));
            System.out.println(duplicateFuture1.result());
            System.out.println(duplicateFuture2.result());
        } catch (Exception ex) {
            System.out.println(ex);
        }

        duplicateFuture3.start();

        try {
            Thread.sleep(3000);
        } catch (InterruptedException e) {
            System.out.println("MainMethod sleep 2 exception");
            e.printStackTrace();
        }

        try {
            System.out.println(duplicateFuture3.result());
            // Because one of the previous calls will still be in the map,
            // One of the next two checks will return true
            System.out.println("future3 == future 1? " + (duplicateFuture1.future() == duplicateFuture3.future()));
            System.out.println("future3 == future 2? " + (duplicateFuture2.future() == duplicateFuture3.future()));
        } catch (Exception ex) {
            System.out.println(ex);
        }

    }

    public static void main(String[] args) {
        TSReviewApp app = new TSReviewApp();
        app.Go();
    }

}

它输出的内容如下:

代码语言:javascript
复制
Thinking about: How is it going?
Evaluated: How is it going?
Thinking about: duplicate input
16
future1 == future 2? false
Evaluated: duplicate input
Thinking about: duplicate input
Evaluated: duplicate input
15
15
15
future3 == future 1? true
future3 == future 2? false

应用程序处理两个字符串“进展如何?”和“重复输入”。在输出中要注意的是,当添加和执行最初的三个项时(来自不同的线程),它将导致三组处理(“重复输入”被处理两次)。如果稍后再次添加“重复输入”,则使用映射中的现有项,因此不必再次进行处理。这有多重要?

通过检查TaskStack调用的值,您可以从您的put中检测到您正处于这种情况:

代码语言:javascript
复制
public CompletableFuture<R> add(T input) {

    return fold(getOpt(inputOutputMap, input),
            // We don't have a precomputed result for the input -> Start processing
            () -> {
                // Process the input on one of the threads provided by the executor
                CompletableFuture<R> futureOutput = CompletableFuture.supplyAsync(() -> func.apply(input), executor);

                // Wrap the future result in a soft reference and put it in the map so the result can be
                // garbage collected once no one else is referencing it
//---->
                SoftReference<CompletableFuture<R>> previousValue = inputOutputMap.put(input, new SoftReference<>(futureOutput));
                if(null != previousValue) {
                    System.out.println("Added '" + input + "' to map when it was already present");
                }
//---->

                return futureOutput;
            },
            // There's already a result for the input, but it may have been garbage collected
            existingSoftFuture -> fold(existingSoftFuture,
                    () -> {
                        // The result was already garbage collected ->
                        // Remove the input too and make this method reprocess the input
                        inputOutputMap.remove(input);
                        return add(input);
                    },
                    // We can return the result without reprocessing the input
                    future -> future
            )
    );
}

同样,对于您的使用场景来说,这可能是可以接受的(如果偶尔执行多次处理,您只从一个线程添加,重复输入的可能性很低),或者您可能需要考虑某种围绕读/写的锁定。

票数 2
EN
页面原文内容由Code Review提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://codereview.stackexchange.com/questions/182579

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档