我正在处理遗留代码,特别是一种BoundedBlockingQueue (主要用作不同线程之间的管道)。由于它在系统中大量使用,并且当前的实现功能具有完全同步的方法和等待/通知机制,所以我尝试使用java 5并发实用程序重写它。下面是我的结果,这在(天真)测试中要快得多,而且我还没有遇到明显的线程问题(但是……(:)
由于这是遗留代码,我不能简单地切换到BlockingQueue实现,但必须支持阻塞读、写和查看方法。另一个复杂因素是需要关闭管道,即作者或读者可以决定关闭管道。然后,读者应该能够清空管道,而作者不应该写得更多。
我将感谢任何建设性的批评,特别是关于我的方法的正确性和对优化的暗示。
public class ConcurrentBufferedPipe implements Pipe {
/** Possible states of a pipe. ERROR and CLOSED are final states. */
private enum State {
OPEN, CLOSED, ERROR;
}
/*
* Relies on the thread-safety of the used BlockingQueue, the volatile
* semantics on the state variable and state invariants of the Pipe
* Interface, namely:
* - a closed or erroneous pipe will never be reopened
* - as long as blocks are available, readers are permitted to continue
* reading - even if the pipe was closed or set to error state
* - it is acceptable that a write happens while another thread closes the
* pipe
* Access to the blocking queue is controlled by two semaphores, one for
* writers and one for readers. They essentially represent the currently
* available blocks or space.
*/
/* waiting times above this timeout are unlikely and indicate starvation */
private static final long TIMEOUT = 60;
private static final TimeUnit UNIT = TimeUnit.SECONDS;
private final String name;
private final BlockingQueue buffer;
private final int size;
/* concurrency tools */
private volatile State state;
private final Semaphore availableBlocks;
private final Semaphore availableSpace;
public ConcurrentBufferedPipe(final String name, final int size) {
super();
this.name = name;
this.size = size;
this.buffer = new LinkedBlockingQueue(size);
this.state = State.OPEN;
this.availableBlocks = new Semaphore(size);
this.availableBlocks.drainPermits();
this.availableSpace = new Semaphore(size);
}
@Override
public Object read() throws PipeIOException, PipeTerminatedException,
DataError {
aquireOrFail(this.availableBlocks);
final Object head = buffer.poll();
if (head == null) { // indicates a closed or error state
assert state != State.OPEN;
this.availableBlocks.release();
return closedMarkerOrError();
} else {
this.availableSpace.release();
}
assert head != null;
return head;
}
/**
* {@inheritDoc}
*
* @throws DataError
* if the pipe is empty and was closed due to an error
*/
@Override
public Object peek() throws PipeIOException, PipeTerminatedException,
DataError {
aquireOrFail(this.availableBlocks);
final Object head = buffer.peek();
this.availableBlocks.release();
if (head == null) {
assert state != State.OPEN;
return closedMarkerOrError();
}
assert head != null;
return head;
}
/**
* {@inheritDoc}
*
* This implementation will also fail with a {@link PipeClosedException} if
* the pipe was closed by a writer.
*
*/
@Override
public void write(final Object block) throws PipeClosedException,
PipeIOException, PipeTerminatedException {
aquireOrFail(this.availableSpace);
boolean hasWroteBlock = false;
if (state == State.OPEN) {
hasWroteBlock = buffer.offer(block);
} else {
this.availableSpace.release();
throw new PipeClosedException();
}
this.availableBlocks.release();
assert hasWroteBlock;
}
@Override
public void closeForReading() {
state = State.CLOSED;
wakeAll();
buffer.clear();
}
@Override
public void closeForWriting() {
state = State.CLOSED;
wakeAll();
}
@Override
public void closeForWritingDueToError() {
state = State.ERROR;
wakeAll();
}
/**
* Safely tries to acquire a permission from a semaphore.
*
* @param resource
* holds permissions
* @throws PipeTerminatedException
* if the current thread is interrupted before or while
* acquiring the permission or acquisition times out
*/
private void aquireOrFail(final Semaphore resource)
throws PipeTerminatedException {
try {
final boolean aquired = resource.tryAcquire(TIMEOUT, UNIT);
if (!aquired) { // indicates time out
throw new PipeTerminatedException(name);
}
} catch (final InterruptedException e) {
throw new PipeTerminatedException(name);
}
}
/**
* Depending on final state of pipe returns appropriate marker value. May
* only be called if this pipe is NOT open.
*
* @return NO_MORE_DATA marker if pipe is closed
* @throws DataError
* if pipe is in error
*/
private Object closedMarkerOrError() throws DataError {
final State state = this.state;
if (state == State.ERROR) {
throw new DataError();
}
assert state == State.CLOSED;
return ControlBlock.NO_MORE_DATA;
}
/**
* Releases all reader / writer limits. May only be called after setting the
* pipe to a final state (ERROR or CLOSED), as it ultimately corrupts the
* invariants guarded by the used semaphores.
*/
private void wakeAll() {
assert this.state != State.OPEN;
this.availableBlocks.release(size);
this.availableSpace.release(size);
}
}谢谢您的投入!
发布于 2012-11-10 12:59:39
发布于 2012-11-14 11:26:59
将所有这些调用移到“最后”部分。
this.availableSpace.release();https://codereview.stackexchange.com/questions/18427
复制相似问题