首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >柔性CountDownLatch?

柔性CountDownLatch?
EN

Stack Overflow用户
提问于 2009-10-28 09:52:16
回答 6查看 9.6K关注 0票数 28

我现在遇到了两次问题,一个生产者线程生成N个工作项,将它们提交到ExecutorService,然后需要等待直到所有N项都被处理了。

警告

  • N是事先不知道的。如果是的话,我只需创建一个CountDownLatch,然后让生产者线程await(),直到所有工作完成为止。
  • 使用CompletionService是不合适的,因为虽然我的生产者线程需要阻塞(即通过调用take()),但没有办法发出所有工作都已完成的信号,从而导致生产者线程停止等待。

我目前最喜欢的解决方案是使用整数计数器,并在提交工作项时使用增量,并在处理工作项时将其递减到。在对所有N个任务进行子处理之后,我的生产者线程将需要等待锁,检查counter == 0是否在任何时候被通知。如果使用者线程减少了计数器,并且新值为0,则需要通知生产者。

是否有更好的方法来解决这个问题,或者我应该在java.util.concurrent中使用一个合适的构造,而不是“滚动我自己的”?

提前谢谢。

EN

回答 6

Stack Overflow用户

回答已采纳

发布于 2009-10-28 12:42:28

java.util.concurrent.Phaser看起来很适合你。它计划在Java 7中发布,但最稳定的版本可以在jsr166的兴趣小组网站上找到。

相位器是一个被美化的循环屏障。你可以注册N个缔约方,当你准备好等待他们在特定阶段前进。

关于它将如何工作的一个快速例子:

代码语言:javascript
复制
final Phaser phaser = new Phaser();

public Runnable getRunnable(){
    return new Runnable(){
        public void run(){
            ..do stuff...
            phaser.arriveAndDeregister();
        }
    };
}
public void doWork(){
    phaser.register();//register self
    for(int i=0 ; i < N; i++){
        phaser.register(); // register this task prior to execution 
        executor.submit( getRunnable());
    }
    phaser.arriveAndAwaitAdvance();
}
票数 31
EN

Stack Overflow用户

发布于 2009-10-28 10:08:07

当然,您可以使用由CountDownLatch保护的AtomicReference,这样您的任务就可以这样包装:

代码语言:javascript
复制
public class MyTask extends Runnable {
    private final Runnable r;
    public MyTask(Runnable r, AtomicReference<CountDownLatch> l) { this.r = r; }

    public void run() {
        r.run();
        while (l.get() == null) Thread.sleep(1000L); //handle Interrupted
        l.get().countDown();
    }
}

注意到,这些任务运行他们的工作,然后旋转直到设置了倒计时(也就是知道任务的总数)。一旦清点完毕,他们就会倒计时并退出.这些文件按以下方式提交:

代码语言:javascript
复制
AtomicReference<CountDownLatch> l = new AtomicReference<CountDownLatch>();
executor.submit(new MyTask(r, l));

当您知道已经创建了多少任务时,之后创建/提交您的工作的要点

代码语言:javascript
复制
latch.set(new CountDownLatch(nTasks));
latch.get().await();
票数 2
EN

Stack Overflow用户

发布于 2009-10-28 10:10:39

我用过ExecutorCompletionService来做这样的事情:

代码语言:javascript
复制
ExecutorCompletionService executor = ...;
int count = 0;
while (...) {
    executor.submit(new Processor());
    count++;
}

//Now, pull the futures out of the queue:
for (int i = 0; i < count; i++) {
    executor.take().get();
}

这包括保持已提交任务的队列,因此如果列表任意长,则您的方法可能更好。

但是,请确保使用AtomicInteger进行协调,这样您就能够在一个线程中增加它,并在工作线程中减少它。

票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/1636194

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档