首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >“关闭”阻塞队列

“关闭”阻塞队列
EN

Stack Overflow用户
提问于 2011-03-21 13:41:40
回答 10查看 21.8K关注 0票数 32

我在一个非常简单的生产者-消费者场景中使用java.util.concurrent.BlockingQueue。例如,这个伪代码描述了消费者部分:

代码语言:javascript
复制
class QueueConsumer implements Runnable {

    @Override
    public void run() {
        while(true)
        {
            try {
                ComplexObject complexObject = myBlockingQueue.take();
                //do something with the complex object
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }
    }
}

到目前一切尚好。在阻塞队列的javadoc中,我读到:

BlockingQueue本质上不支持任何类型的“关闭”或“关闭”操作,以指示不会添加更多的项。这些特性的需求和使用往往依赖于实现。例如,一个常见的策略是生产者插入特定的流结束或有毒对象,当消费者采取相应的解释。

不幸的是,由于使用中的泛型和ComplexObject的性质,将“毒对象”推到队列中并不容易。因此,在我的场景中,这种“常见的策略”并不是很方便。

我的问题是:我还能用什么好的策略/模式来“关闭”队列呢?

谢谢!

EN

回答 10

Stack Overflow用户

回答已采纳

发布于 2011-03-21 13:44:26

如果您有使用者线程的句柄,您可以中断它。用你给出的代码,这将杀死消费者。我不希望生产者有这种情况;它可能不得不以某种方式回调程序控制器,让它知道它已经完成了。然后控制器会中断使用者线程。

你总是可以在服从中断之前完成工作。例如:

代码语言:javascript
复制
class QueueConsumer implements Runnable {
    @Override
    public void run() {
        while(!(Thread.currentThread().isInterrupted())) {
            try {
                final ComplexObject complexObject = myBlockingQueue.take();
                this.process(complexObject);

            } catch (InterruptedException e) {
                // Set interrupted flag.
                Thread.currentThread().interrupt();
            }
        }

        // Thread is getting ready to die, but first,
        // drain remaining elements on the queue and process them.
        final LinkedList<ComplexObject> remainingObjects;
        myBlockingQueue.drainTo(remainingObjects);
        for(ComplexObject complexObject : remainingObjects) {
            this.process(complexObject);
        }
    }

    private void process(final ComplexObject complexObject) {
        // Do something with the complex object.
    }
}

实际上,我宁愿这样做,也不愿在某种程度上毒害排队的人。如果您想要杀死线程,请让线程杀死自己。

(很高兴看到有人恰当地处理了InterruptedException。)

在这里,对于中断的处理似乎有一些争论。首先,我希望每个人都能读这篇文章

现在,有了这样的理解,没有人真正读到这句话,这就是交易。只有当InterruptedException在中断时处于阻塞状态时,线程才会接收它。在这种情况下,Thread.interrupted()将返回false。如果没有阻塞,它将不会接收到此异常,相反,Thread.interrupted()将返回true。因此,无论如何,您的循环保护都应该检查Thread.interrupted(),否则可能会丢失线程的中断。

因此,由于无论发生什么,您都要检查Thread.interrupted(),并且必须捕获InterruptedException (并且应该处理它,即使不是被迫的),所以现在有两个处理相同事件的代码区域,线程中断。处理此问题的一种方法是将它们规范化为一个条件,这意味着要么布尔状态检查可以抛出异常,要么异常可以设置布尔状态。我选择后一种。

编辑:注意到静态Thread#interrupted方法清除当前线程的中断状态。

票数 21
EN

Stack Overflow用户

发布于 2011-03-21 15:07:50

另一个让这变得简单的想法:

代码语言:javascript
复制
class ComplexObject implements QueueableComplexObject
{
    /* the meat of your complex object is here as before, just need to
     * add the following line and the "implements" clause above
     */
    @Override public ComplexObject asComplexObject() { return this; }
}

enum NullComplexObject implements QueueableComplexObject
{
    INSTANCE;

    @Override public ComplexObject asComplexObject() { return null; }
}

interface QueueableComplexObject
{
    public ComplexObject asComplexObject();
}

然后使用BlockingQueue<QueueableComplexObject>作为队列。当您希望结束队列的处理时,执行queue.offer(NullComplexObject.INSTANCE)。在消费者方面,做

代码语言:javascript
复制
boolean ok = true;
while (ok)
{
    ComplexObject obj = queue.take().asComplexObject();
    if (obj == null)
        ok = false;
    else
        process(obj);
}

/* interrupt handling elided: implement this as you see fit,
 * depending on whether you watch to swallow interrupts or propagate them
 * as in your original post
 */

不需要instanceof,您也不必构建一个伪ComplexObject,这可能会因其实现而变得昂贵/困难。

票数 13
EN

Stack Overflow用户

发布于 2011-03-21 14:41:33

另一种选择是用ExecutorService包装正在进行的处理,并让ExecutorService自己控制是否将作业添加到队列中。

基本上,您可以利用ExecutorService.shutdown(),它在被调用时不允许执行程序处理更多的任务。

我不知道您目前是如何在示例中向QueueConsumer提交任务的。我假设您有某种submit()方法,并在示例中使用了类似的方法。

代码语言:javascript
复制
import java.util.concurrent.*;

class QueueConsumer {
    private final ExecutorService executor = Executors.newSingleThreadExecutor();

    public void shutdown() {
        executor.shutdown(); // gracefully shuts down the executor
    }

    // 'Result' is a class you'll have to write yourself, if you want.
    // If you don't need to provide a result, you can just Runnable
    // instead of Callable.
    public Future<Result> submit(final ComplexObject complexObject) {
        if(executor.isShutdown()) {
            // handle submitted tasks after the executor has been told to shutdown
        }

        return executor.submit(new Callable<Result>() {
            @Override
            public Result call() {
                return process(complexObject);
            }
        });
    }

    private Result process(final ComplexObject complexObject) {
        // Do something with the complex object.
    }
}

这个例子只是对java.util.concurrent包所提供的东西的即兴说明;可能会对它进行一些优化(例如,QueueConsumer作为它自己的类可能甚至没有必要;您可以只为提交任务的任何生产者提供ExecutorService )。

深入研究java.util.concurrent包(从上面的一些链接开始)。您可能会发现,它为您所要做的工作提供了许多很好的选择,您甚至不必担心对工作队列的管理。

票数 9
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/5378391

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档