首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >ListeningExecutorService创建的未来的添加/扩展行为

ListeningExecutorService创建的未来的添加/扩展行为
EN

Stack Overflow用户
提问于 2012-01-19 18:19:09
回答 5查看 4.7K关注 0票数 10

最终目标是根据Callable/Runnable参数的类型向ListenableFuture添加额外的行为。我想向每个未来方法添加额外的行为。(用例可以在文摘的javadoc和Goetz的Java并发在实践中的应用第7.1.7节中找到)

我有一个现有的ExecutorService,它覆盖了newTaskFor。它测试参数的类型并创建FutureTask的子类。这当然支持提交以及invokeAnyinvokeAll

如何对由ListenableFuture返回的ListeningExecutorService获得相同的效果?

换句话说,我可以把这些代码放在哪里?

代码语言:javascript
复制
if (callable instanceof SomeClass) {
   return new FutureTask<T>(callable) {
        public boolean cancel(boolean mayInterruptIfRunning) {
            System.out.println("Canceling Task");
            return super.cancel(mayInterruptIfRunning);
        }
    };
} else {
    return new FutureTask<T>(callable);
}

,以便我的客户端可以使用

代码语言:javascript
复制
ListeningExecutorService executor = ...;
Collection<Callable> callables = ImmutableSet.of(new SomeClass());
List<Future<?>> futures = executor.invokeAll(callables);
for (Future<?> future : futures) {
    future.cancel(true);
}

失败的解决方案

下面列出了我已经尝试过的事情,以及它们不起作用的原因。

解A

MyExecutorService传给MoreExecutors.listeningDecorator

问题1:不幸的是,生成的ListeningExecutorService (一个AbstractListeningExecutorService)没有委托给ExecutorService方法,而是委托给执行者上的执行(可运行)方法。因此,从来不调用MyExecutorService上的MyExecutorService方法。

问题2:AbstractListeningExecutorService通过静态工厂方法创建可运行的( ListenableFutureTask),这是我无法扩展的。

解决方案B

newTaskFor中,通常创建MyRunnableFuture,然后用ListenableFutureTask包装它。

问题1:ListenableFutureTask的工厂方法不接受RunnableFuture,它们接受RunnableCallable。如果我将MyRunnableFuture作为可运行的传递,则生成的ListenableFutureTask只是调用run(),而不是调用任何Future方法(我的行为就是这样)。

问题2:即使它确实调用了我的Future方法,MyRunnableFuture也不是一个Callable,所以我必须在创建ListenableFutureTask时提供一个返回值。但我没有..。因此出现了Callable

解决方案C

让MyRunnableFuture扩展ListenableFutureTask而不是FutureTask

问题:ListenableFutureTask现在是最终版本(截至r10 / r11)。

解D

MyRunnableFuture扩展ForwardingListenableFuture并实现RunnableFuture。然后将SomeClass参数包装在一个ListenableFutureTask中,并从delegate()返回

问题:它挂起来了。我不太理解这个问题,无法解释它,但是这种配置会导致FutureTask.Sync中的死锁。

源代码:根据请求,这里是挂起的解决方案D的源代码:

代码语言:javascript
复制
import java.util.*;
import java.util.concurrent.*;
import com.google.common.collect.ImmutableSet;
import com.google.common.util.concurrent.*;

/** See http://stackoverflow.com/q/8931215/290943 */
public final class MyListeningExecutorServiceD extends ThreadPoolExecutor implements ListeningExecutorService {

    // ===== Test Harness =====

    private static interface SomeInterface {
        public String getName();
    }
    
    private static class SomeClass implements SomeInterface, Callable<Void>, Runnable {
        private final String name;

        private SomeClass(String name) {
            this.name = name;
        }

        public Void call() throws Exception {
            System.out.println("SomeClass.call");
            return null;
        }

        public void run() {
            System.out.println("SomeClass.run");
        }

        public String getName() {
            return name;
        }
    }

    private static class MyListener implements FutureCallback<Void> {
        public void onSuccess(Void result) {
            System.out.println("MyListener.onSuccess");
        }

        public void onFailure(Throwable t) {
            System.out.println("MyListener.onFailure");
        }
    }

    public static void main(String[] args) throws InterruptedException {
        System.out.println("Main.start");
        
        SomeClass someClass = new SomeClass("Main.someClass");
        
        ListeningExecutorService executor = new MyListeningExecutorServiceD();
        Collection<Callable<Void>> callables = ImmutableSet.<Callable<Void>>of(someClass);
        List<Future<Void>> futures = executor.invokeAll(callables);
        
        for (Future<Void> future : futures) {
            Futures.addCallback((ListenableFuture<Void>) future, new MyListener());
            future.cancel(true);
        }
        
        System.out.println("Main.done");
    }

    // ===== Implementation =====

    private static class MyRunnableFutureD<T> extends ForwardingListenableFuture<T> implements RunnableFuture<T> {

        private final ListenableFuture<T> delegate;
        private final SomeInterface someClass;

        private MyRunnableFutureD(SomeInterface someClass, Runnable runnable, T value) {
            assert someClass == runnable;
            this.delegate = ListenableFutureTask.create(runnable, value);
            this.someClass = someClass;
        }
        
        private MyRunnableFutureD(SomeClass someClass, Callable<T> callable) {
            assert someClass == callable;
            this.delegate = ListenableFutureTask.create(callable);
            this.someClass = someClass;
        }

        @Override
        protected ListenableFuture<T> delegate() {
            return delegate;
        }

        public void run() {
            System.out.println("MyRunnableFuture.run");
            try {
                delegate.get();
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (ExecutionException e) {
                e.printStackTrace();
            }
        }

        @Override
        public boolean cancel(boolean mayInterruptIfRunning) {
            System.out.println("MyRunnableFuture.cancel " + someClass.getName());
            return super.cancel(mayInterruptIfRunning);
        }
    }

    public MyListeningExecutorServiceD() {
        // Same as Executors.newSingleThreadExecutor for now
        super(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
    }

    @Override
    protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
        if (runnable instanceof SomeClass) {
            return new MyRunnableFutureD<T>((SomeClass) runnable, runnable, value);
        } else {
            return new FutureTask<T>(runnable, value);
        }
    }

    @Override
    protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
        if (callable instanceof SomeClass) {
            return new MyRunnableFutureD<T>((SomeClass) callable, callable);
        } else {
            return new FutureTask<T>(callable);
        }
    }

    /** Must override to supply co-variant return type */
    @Override
    public ListenableFuture<?> submit(Runnable task) {
        return (ListenableFuture<?>) super.submit(task);
    }

    /** Must override to supply co-variant return type */
    @Override
    public <T> ListenableFuture<T> submit(Runnable task, T result) {
        return (ListenableFuture<T>) super.submit(task, result);
    }

    /** Must override to supply co-variant return type */
    @Override
    public <T> ListenableFuture<T> submit(Callable<T> task) {
        return (ListenableFuture<T>) super.submit(task);
    }
}
EN

回答 5

Stack Overflow用户

回答已采纳

发布于 2012-01-20 16:08:45

基于这个问题和我最近进行的一些其他讨论,我得出的结论是,RunnableFuture/FutureTask本质上具有误导性:很明显,您提交了一个Runnable,并且很明显,您得到了一个Future,显然底层的Thread需要一个Runnable。但是为什么一个类应该同时实现RunnableFuture呢?如果是的话,它要替换哪个Runnable呢?这已经够糟糕的了,但是我们引入了多个层次的执行者,事情真的失控了。

如果这里有一个解决方案,我认为需要将FutureTask作为AbstractExecutorService的实现细节来处理。我会专注于把问题分成两部分:

  • 我想有条件地修改返回的Future
  • 我想有条件地修改由executor服务运行的代码。)实际上我不确定这是否是这里的要求,但我会把它包括在内,以防万一。即使不是,它也可能有助于建立Runnable/Future的区别。)

(抱怨-马克抱怨)

代码语言:javascript
复制
class MyWrapperExecutor extends ForwardingListeningExecutorService {
  private final ExecutorService delegateExecutor;

  @Override public <T> ListenableFuture<T> submit(Callable<T> task) {
    if (callable instanceof SomeClass) {
      // Modify and submit Callable (or just submit the original Callable):
      ListenableFuture<T> delegateFuture =
          delegateExecutor.submit(new MyCallable(callable));
      // Modify Future:
      return new MyWrapperFuture<T>(delegateFuture);
    } else {
      return delegateExecutor.submit(callable);
    }
  }

  // etc.
}

那能行吗?

票数 3
EN

Stack Overflow用户

发布于 2012-01-19 18:37:28

根据ListeningExecutorService Javadoc的说法,你可以用MoreExecutors.listeningDecorator来装饰自己的ExecutorService。

因此,使用覆盖ExecutorService的newTaskFor,并使用上面的方法包装它。这对你有用吗?

更新

好吧,这就是我要做的:

1)如果你还没有下载番石榴资源。

2)不要使用listeningDecorator,而是让您的自定义ExecutorService实现ListeningExecutorService。

3) FutureTask的子类应该实现ListenableFuture,并从ListenableFutureTask复制代码,这非常简单,然后添加cancel方法覆盖。

4)通过将现有方法的返回方法更改为ListeningExecutorService,在自定义ExecutorService上实现ListenableFuture方法。

票数 1
EN

Stack Overflow用户

发布于 2012-01-19 19:58:57

我怀疑MoreExecutors.listeningDecorator(service)会修改基础service返回的期货。因此,如果您已经修改了基础ExecutorService,并且您只是在修饰Future方法,那么您可能就可以直接使用MoreExecutors.listeningDecorator(service)了。你试过看它是否有效吗?如果没有,你能提供更多的细节来解释为什么不起作用吗?

-更新

您编写的代码似乎将submit和invokeAll混为一谈。(具体来说,它调用invokeAll,它不委托提交.)

尽管如此,我很快得出的结论是,ListenableFutures和ListeningExecutorService不应该被这样滥用。在没有死锁的情况下实现这些东西是一个真正的难题,如果你能避免的话,你就不应该这么做。

我怀疑可能值得提出一个解决方案A不起作用的番石榴问题。但我一直在努力想办法做你正在做的事,只是觉得这样做是不对的。

你能解释一下你为什么要回报不同的未来吗?

票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/8931215

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档