我有几个异步任务在运行,我需要等待,直到其中至少有一个任务完成(将来我可能需要等待out,直到N个任务中的M个任务完成)。目前它们被表示为Future,所以我需要像这样的东西
/**
* Blocks current thread until one of specified futures is done and returns it.
*/
public static <T> Future<T> waitForAny(Collection<Future<T>> futures)
throws AllFuturesFailedException有像这样的东西吗?或者任何类似的东西,对未来来说不是必须的。目前,我循环收集期货,检查其中一个是否完成,然后休眠一段时间,然后再次检查。这看起来不是最好的解决方案,因为如果我长时间睡眠,那么就会增加不必要的延迟,如果我短时间睡眠,那么它可能会影响性能。
我可以试着用
new CountDownLatch(1)并在任务完成并完成时减少倒计时
countdown.await(),但我发现只有在我控制未来创造的情况下才有可能。这是可能的,但需要重新设计系统,因为目前任务创建的逻辑(向ExecutorService发送Callable )与等待哪个未来的决策是分开的。我也可以重写
<T> RunnableFuture<T> AbstractExecutorService.newTaskFor(Callable<T> callable)并创建RunnableFuture的自定义实现,使其能够附加侦听器,以便在任务完成时收到通知,然后将此类侦听器附加到所需的任务并使用CountDownLatch,但这意味着我必须为我使用的每个ExecutorService覆盖newTaskFor -并且可能会有不扩展AbstractExecutorService的实现。出于同样的目的,我也可以尝试包装给定的ExecutorService,但之后我必须装饰所有产生Futures的方法。
所有这些解决方案都可能有效,但似乎非常不自然。看起来我错过了一些简单的东西,比如
WaitHandle.WaitAny(WaitHandle[] waitHandles)在c#中。对于这类问题,有什么众所周知的解决方案吗?
更新:
最初,我根本无法访问Future creation,因此没有优雅的解决方案。在重新设计系统后,我可以访问Future creation,并能够将countDownLatch.countdown()添加到执行过程中,然后我可以使用countDownLatch.await(),一切都很正常。感谢你的其他回答,我不知道ExecutorCompletionService,它确实可以在类似的任务中有所帮助,但在这种特殊情况下,它不能使用,因为一些未来是在没有任何执行器的情况下创建的-实际任务通过网络发送到另一台服务器,远程完成,并收到完成通知。
发布于 2008-09-22 21:18:04
据我所知,Java没有类似于WaitHandle.WaitAny方法的结构。
在我看来,这可以通过"WaitableFuture“装饰器来实现:
public WaitableFuture<T>
extends Future<T>
{
private CountDownLatch countDownLatch;
WaitableFuture(CountDownLatch countDownLatch)
{
super();
this.countDownLatch = countDownLatch;
}
void doTask()
{
super.doTask();
this.countDownLatch.countDown();
}
}虽然这只有在它可以插入到执行代码之前时才能起作用,否则执行代码就不会有新的doTask()方法。但是,如果您不能在执行之前以某种方式获得对Future对象的控制,那么我真的看不到在没有轮询的情况下这样做的方法。
或者,如果未来总是在它自己的线程中运行,你可以以某种方式获得那个线程。然后,您可以生成一个新线程来连接其他线程,然后在连接返回后处理等待机制……这将是非常丑陋的,并将导致大量的开销。如果一些Future对象没有完成,你可能会有很多被阻塞的线程,这取决于死线程。如果您不小心,这可能会泄漏内存和系统资源。
/**
* Extremely ugly way of implementing WaitHandle.WaitAny for Thread.Join().
*/
public static joinAny(Collection<Thread> threads, int numberToWaitFor)
{
CountDownLatch countDownLatch = new CountDownLatch(numberToWaitFor);
foreach(Thread thread in threads)
{
(new Thread(new JoinThreadHelper(thread, countDownLatch))).start();
}
countDownLatch.await();
}
class JoinThreadHelper
implements Runnable
{
Thread thread;
CountDownLatch countDownLatch;
JoinThreadHelper(Thread thread, CountDownLatch countDownLatch)
{
this.thread = thread;
this.countDownLatch = countDownLatch;
}
void run()
{
this.thread.join();
this.countDownLatch.countDown();
}
}发布于 2008-09-22 23:42:28
简单,请查看ExecutorCompletionService。
发布于 2008-09-23 03:45:42
ExecutorService.invokeAny
https://stackoverflow.com/questions/117690
复制相似问题