我正在编写NIO2服务器,我需要在AsynchronousSocketChannel上进行异步读取操作,每个操作包括读取一个整数,并进一步读取与此整数相等的相同通道字节数。问题是,当我将两个或多个CompletionHandler的处理程序放在一起(因为有多个读取操作的请求),并且首先触发这些处理程序时,我在第一个处理程序的complete()方法中的进一步读取代码不能正常工作,因为当通道上有信息时,第二个处理程序就会立即被触发。在没有complete()的情况下,如何阻塞通道直到进一步阅读Future完成?我不能使用未来,因为我需要将处理程序放在套接字上,然后传递到其他任务。
for (final Map.Entry<String, AsynchronousSocketChannel> entry : ipSocketTable.entrySet()) {
try {
final AsynchronousSocketChannel outSocket = entry.getValue();
synchronized (outSocket) {
final ByteBuffer buf = ByteBuffer.allocateDirect(9);
outSocket.read(buf, null, new DataServerResponseHandler(buf, outSocket, resultTable, server, entry.getKey()));
}
} catch (Exception e) {
}
}下面是DataServerResponseHandler类:
class DataServerResponseHandler implements CompletionHandler<Integer, Void> {
private ConcurrentHashMap<String, Boolean> resultTable = null;
private AsynchronousSocketChannel channel = null;
private TcpServer server;
private String ip;
private ByteBuffer msg;
public DataServerResponseHandler(ByteBuffer msg, AsynchronousSocketChannel channel,
ConcurrentHashMap<String, Boolean> resultTable, TcpServer server, String ip) {
this.msg = msg;
this.channel = channel;
this.resultTable = resultTable;
this.server = server;
this.ip = ip;
}
@Override
public void completed(Integer result, Void attachment) {
try {
msg.rewind();
int resultCode = msg.get() & 0xff;
int ipOne = msg.get() & 0xff;
int ipTwo = msg.get() & 0xff;
int ipThree = msg.get() & 0xff;
int ipFour = msg.get() & 0xff;
int length = msg.getInt();
msg.rewind();
ByteBuffer buf = ByteBuffer.allocateDirect(length);
channel.read(buf).get();
buf.rewind();
} catch (Exception e) {
e.printStackTrace();
}
}
@Override
public void failed(Throwable exc, Void attachment) {
throw new UnsupportedOperationException("Not supported yet."); //To change body of generated methods, choose Tools | Templates.
}
}发布于 2015-10-09 13:57:57
这个代码有几个问题。
首先,read并不保证它将读取所有剩余的字节,而是在读取至少一个字节时调用完成处理程序。因此,您必须检查缓冲区的位置,并重新调用读取,直到有9个字节的头或有效载荷的长度。
if (msg.position() < 9) {
channel.read(msg, null, this);
return;
}对于第二部分,为了继续异步方法,再次运行处理程序。您可以创建新的,专门处理有效负载或重用现有的负载,您必须记住状态:
switch (state) {
case READ_HEADER:
if (msg.remaining() > 0) {
channel.read(msg, null, this);
return;
}
// do the parsing the IP and length
state = READ_PAYLOAD;
channel.read(payloadBuf, null, this);
break;
case READ_PAYLOAD:
if (payloadBuf.remaining() > 0) {
channel.read(payloadBuf, null, this);
return;
}
payloadBuf.flip();
// get content from payloadBuf
break;
}https://stackoverflow.com/questions/29335113
复制相似问题