首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >java中并发队列消耗的谜题

java中并发队列消耗的谜题
EN

Stack Overflow用户
提问于 2015-03-28 22:49:33
回答 2查看 946关注 0票数 0

为了使用Java OpenCV加快图像处理速度,我尝试使用并行流来使用OpenCV <Mat>队列。如果我对算法进行计时,并计算队列中剩下的内容,则在并行处理流时会得到不连贯的结果,但顺序计算结果是正确的。因为我使用了ConcurrentLinkedQueue(),所以我认为我对线程安全和异步性都很好,但显然不是。有谁知道如何绕过这个问题吗?

备注:

  • 元素在使用过程中仍被放入队列中。
  • 我正在运行一个4真(8虚拟)核心处理器。

有顺序流的结果:

帧集合起始大小(=生产):1455 帧集端尺寸(=生产-消耗):1360 算法运行后产生的列表大小(=消耗):100 算法: 6956 ms

平行流的结果:

帧集合起始大小(=生产):1455 帧集端尺寸(=生产-消耗):440 算法运行后产生的列表大小(=消耗):100 算法: 9242 ms

我的代码:

代码语言:javascript
复制
public class OvusculeTestConcurrent {

    public final static ConcurrentLinkedQueue<Mat> frameCollection = new ConcurrentLinkedQueue<Mat>();

    public static void main(String[] args) throws InterruptedException, ExecutionException {
        System.loadLibrary(Core.NATIVE_LIBRARY_NAME);
        final String path = "C:\\Users\\Raoul\\workspace\\aorta2\\ressource\\artery_src_for_dual.avi";
        long startAlgoTime = System.nanoTime();

        // Constitute a frame collection in async mode
        Capture cap = new Capture(path, frameCollection);
        new Thread(cap).start();
        Thread.sleep(3000); //leaves time to accumulate frames
        System.out.println("frame collection start size (=production): "+frameCollection.size());

        //Consumes the current queue in parallel/sequential
        List<ImagePlus> lm = Stream.generate(() -> {
                return frameCollection.poll();
            })
            .parallel() // comment to disable parallel computing
            .limit(100L)
            .map(img -> utils.PrepareImage(img,
                                    new Point(300, 250),
                                    new Point(450, 250),
                                    new Point(400, 400),
                                    0.25))
            .collect(Collectors.toList());

        //timing & printing the results
        long endAlgoTime = System.nanoTime();
        long algoDuration = (endAlgoTime - startAlgoTime)/1_000_000;  //divide by 1_000_000 to get milliseconds.
        System.out.println("frame collection end size (=production - consumption): "+frameCollection.size());
        System.out.println("resulting list size after algorithm run (=consumption): "+lm.size());
        System.out.println("algorithm: "+algoDuration+" ms");
        System.exit(0);
    }

}
EN

回答 2

Stack Overflow用户

回答已采纳

发布于 2015-04-23 04:37:31

这段代码引起了我的注意。

首先,使用Stream.generate创建流是正确的。它比只调用queue.stream()更好,它将返回一个仅包含队列当前内容的流。您说在处理过程中会将元素添加到队列中,这样就不能工作了。

一个问题是生成这样的流的代码(为清晰性而编辑):

代码语言:javascript
复制
Stream.generate(() -> queue.poll())

问题在于poll方法,该方法定义如下:

检索并移除此队列的头,如果此队列为空,则返回null。

可能是,当流并行运行时,流的线程可以比生成元素并插入队列时更快地排出队列。如果发生这种情况,队列将在某个点空出,流将被poll返回的poll元素填充。

我不知道当传递null时PrepareImage会做什么,但是它似乎传递到输出中,这就是为什么目标列表中总是有100个元素。

另一种方法是使用BlockingQueue实现并使用take方法,如下所示

代码语言:javascript
复制
Stream.generate(() -> queue.take())

这将避免向溪流中注入明渠。我不确定您应该使用哪个BlockingQueue实现,但我建议您研究一个大小受限的实现。如果您的生产者超过了您的使用者,一个无限的队列可能会扩展到填充所有可用内存。

不幸的是,BlockingQueue.take()抛出了InterruptedException,因此不能在简单的lambda中使用它。你得想办法在中断的时候做些什么。或许还会有个假人元素什么的。

另一个问题是,limit方法对下游传递的元素数量施加了限制,但在并行流中,多线程可能会机会主义地从流中提取更多的元素进行处理。这些将由limit操作缓冲,直到达到其限制为止,此时流处理将终止。从流源中提取并在达到限制时缓冲的任何元素都会被简单地丢弃。这可能是为什么从队列中提取了超过1,000个元素,但结果列表中只有100个元素。

(但即使在顺序的情况下,这些数字也不加在一起。我不认为会发生同样的事情,当达到极限时,缓冲元素会被丢弃。也许是因为加工过程中产生了额外的元素吧?)

如果您可以忍受被丢弃的元素,那么由queue.take()提供的并行流可能会工作;否则,需要采用不同的方法。

票数 1
EN

Stack Overflow用户

发布于 2016-12-30 12:37:59

我也有过类似的问题,在网上找不到解决办法。想出我自己的Spliterator,看起来SpliteratorWithUnknownSize不支持并行性(没有对此进行研究,只是看到所有处理都是在同一个线程中完成的)。下面是:

代码语言:javascript
复制
public class QueueDrainSpliterator<T> implements Spliterator<T> {

    private final BlockingQueue<T> elements;

    public QueueDrainSpliterator(BlockingQueue<T> elements) {
        this.elements = elements;
    }

    @Override
    public boolean tryAdvance(Consumer<? super T> action) {
        T el = elements.poll();
        if (el != null) {
            action.accept(el);
            return true;
        }
        return false;
    }

    @Override
    public Spliterator<T> trySplit() {
        if (!elements.isEmpty()) {
            BlockingQueue<T> split = new LinkedBlockingQueue<T>();
            elements.drainTo(split, (int) Math.ceil(elements.size() / 2d));
            return new QueueDrainSpliterator<T>(split);
        }
        return null;
    }

    @Override
    public long estimateSize() {
        return elements.size();
    }

    @Override
    public int characteristics() {
        return Spliterator.NONNULL | Spliterator.CONCURRENT;
    }
}

用法示例:

代码语言:javascript
复制
StreamSupport.stream(new QueueDrainSpliterator<>(events), true)
                     .forEach(consumer);

当资源处于队列中时,它将遍历资源。如果队列是空的,它就会完成。如果需要等待生成元素,可以尝试使用elements.poll(long timeout, TimeUnit unit)

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/29323538

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档