首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >可关闭BlockingQueue

可关闭BlockingQueue
EN

Code Review用户
提问于 2012-11-09 22:38:23
回答 2查看 979关注 0票数 3

我正在处理遗留代码,特别是一种BoundedBlockingQueue (主要用作不同线程之间的管道)。由于它在系统中大量使用,并且当前的实现功能具有完全同步的方法和等待/通知机制,所以我尝试使用java 5并发实用程序重写它。下面是我的结果,这在(天真)测试中要快得多,而且我还没有遇到明显的线程问题(但是……(:)

由于这是遗留代码,我不能简单地切换到BlockingQueue实现,但必须支持阻塞读、写和查看方法。另一个复杂因素是需要关闭管道,即作者或读者可以决定关闭管道。然后,读者应该能够清空管道,而作者不应该写得更多。

我将感谢任何建设性的批评,特别是关于我的方法的正确性和对优化的暗示。

代码语言:javascript
复制
public class ConcurrentBufferedPipe implements Pipe {
    /** Possible states of a pipe. ERROR and CLOSED are final states. */
    private enum State {
        OPEN, CLOSED, ERROR;
    }

    /* 
     * Relies on the thread-safety of the used BlockingQueue, the volatile 
     * semantics on the state variable and state invariants of the Pipe 
     * Interface, namely:
     *  - a closed or erroneous pipe will never be reopened
     *  - as long as blocks are available, readers are permitted to continue
     *      reading - even if the pipe was closed or set to error state
     *  - it is acceptable that a write happens while another thread closes the 
     *      pipe
     *  Access to the blocking queue is controlled by two semaphores, one for
     *  writers and one for readers. They essentially represent the currently
     *  available blocks or space.
     */

    /* waiting times above this timeout are unlikely and indicate starvation */
    private static final long TIMEOUT = 60;
    private static final TimeUnit UNIT = TimeUnit.SECONDS;

    private final String name;
    private final BlockingQueue buffer;
    private final int size;

    /* concurrency tools */
    private volatile State state;
    private final Semaphore availableBlocks;
    private final Semaphore availableSpace;

    public ConcurrentBufferedPipe(final String name, final int size) {
        super();
        this.name = name;
        this.size = size;
        this.buffer = new LinkedBlockingQueue(size);
        this.state = State.OPEN;
        this.availableBlocks = new Semaphore(size);
        this.availableBlocks.drainPermits();
        this.availableSpace = new Semaphore(size);
    }

    @Override
    public Object read() throws PipeIOException, PipeTerminatedException,
            DataError {
        aquireOrFail(this.availableBlocks);
        final Object head = buffer.poll();
        if (head == null) { // indicates a closed or error state
            assert state != State.OPEN;
            this.availableBlocks.release();
            return closedMarkerOrError();
        } else {
            this.availableSpace.release();
        }
        assert head != null;
        return head;
    }

    /**
     * {@inheritDoc}
     * 
     * @throws DataError
     *             if the pipe is empty and was closed due to an error
     */
    @Override
    public Object peek() throws PipeIOException, PipeTerminatedException,
            DataError {
        aquireOrFail(this.availableBlocks);
        final Object head = buffer.peek();
        this.availableBlocks.release();
        if (head == null) {
            assert state != State.OPEN;
            return closedMarkerOrError();
        }
        assert head != null;
        return head;
    }

    /**
     * {@inheritDoc}
     * 
     * This implementation will also fail with a {@link PipeClosedException} if
     * the pipe was closed by a writer.
     * 
     */
    @Override
    public void write(final Object block) throws PipeClosedException,
            PipeIOException, PipeTerminatedException {
        aquireOrFail(this.availableSpace);
        boolean hasWroteBlock = false;
        if (state == State.OPEN) {
            hasWroteBlock = buffer.offer(block);
        } else {
            this.availableSpace.release();
            throw new PipeClosedException();
        }
        this.availableBlocks.release();
        assert hasWroteBlock;
    }

    @Override
    public void closeForReading() {
        state = State.CLOSED;
        wakeAll();
        buffer.clear();
    }

    @Override
    public void closeForWriting() {
        state = State.CLOSED;
        wakeAll();
    }

    @Override
    public void closeForWritingDueToError() {
        state = State.ERROR;
        wakeAll();
    }

    /**
     * Safely tries to acquire a permission from a semaphore.
     * 
     * @param resource
     *            holds permissions
     * @throws PipeTerminatedException
     *             if the current thread is interrupted before or while
     *             acquiring the permission or acquisition times out
     */
    private void aquireOrFail(final Semaphore resource)
            throws PipeTerminatedException {
        try {
            final boolean aquired = resource.tryAcquire(TIMEOUT, UNIT);
            if (!aquired) { // indicates time out
                throw new PipeTerminatedException(name);
            }
        } catch (final InterruptedException e) {
            throw new PipeTerminatedException(name);
        }
    }

    /**
     * Depending on final state of pipe returns appropriate marker value. May
     * only be called if this pipe is NOT open.
     * 
     * @return NO_MORE_DATA marker if pipe is closed
     * @throws DataError
     *             if pipe is in error
     */
    private Object closedMarkerOrError() throws DataError {
        final State state = this.state;
        if (state == State.ERROR) {
            throw new DataError();
        }
        assert state == State.CLOSED;
        return ControlBlock.NO_MORE_DATA;
    }

    /**
     * Releases all reader / writer limits. May only be called after setting the
     * pipe to a final state (ERROR or CLOSED), as it ultimately corrupts the
     * invariants guarded by the used semaphores.
     */
    private void wakeAll() {
        assert this.state != State.OPEN;
        this.availableBlocks.release(size);
        this.availableSpace.release(size);
    }
}

谢谢您的投入!

EN

回答 2

Code Review用户

回答已采纳

发布于 2012-11-10 12:59:39

  1. 读写方法需要3个同步操作(2个信号量操作和一个对LinkedBlockingQueue的访问)。如果您使用同步方法(或使用ReentrantLock进行锁定)和非同步底层队列,那么每个读/写/peek方法只需要一个同步操作。
  2. LinkedBlockingQueue为放入队列的每一项创建额外的包装对象(链接)。使用java.util.ArrayDeque(大小)来避免多余的对象创建。
票数 1
EN

Code Review用户

发布于 2012-11-14 11:26:59

将所有这些调用移到“最后”部分。

代码语言:javascript
复制
this.availableSpace.release();
票数 0
EN
页面原文内容由Code Review提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://codereview.stackexchange.com/questions/18427

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档