自定义ByteToMessageEncoder不接收在同一tcp连接中发送的字节,而是在不同tcp消息中发送的字节。
我被指派去解决一个问题,这个问题是几年前的旧系统开始出现问题的。在我看来,有一个由其他开发人员用netty编写的tcp服务器,它接收具有静态长度报头和可变长度主体的二进制消息。正文长度由告知消息类型的报头字段定义。我们维护一个messagetype及其长度的映射。
所面临的问题是,在正确解码报头并知道主体长度之后,相同的解码器期望主体出现在相同的ByteBuf中(即来自byteChannel的一个fireChannelRead事件)。
然而,有时缓冲区中没有足够的内容,所以解码器放弃了。但是下一次调用decode-method时,主体字节会出现,并被错误地解释为头部,从而使解码器不同步。
使用netty组装消息的正确方式是什么,因为消息的字节可能会以较小的块插入?
以下是当前解码器的基础知识。
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
Message message = decode(ctx, in);
if (message != null) {
out.add(message);
}
}
protected Message decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception {
if (in.readableBytes() < MessageHeader.SIZE) {
return null;
}
ByteBuf headerBytes = in.readBytes(MessageHeader.SIZE);
MessageHeader header = MessageHeader.decode(headerBytes, new ProcessingTracker(MessagePart.HEADER));
if (header == null) {
ctx.disconnect().sync();
logger.debug("Disconnected from channel");
return null;
}
int bodySize = header.getMessageType().getMessageBodySize();
if (!waitingForBytes(in, bodySize, READ_TRY_TIMES)) {
ctx.disconnect().sync();
logger.debug("Disconnected from channel");
return null;
}
ByteBuf messageBytes = in.readBytes(bodySize);
messageBytes.resetReaderIndex();
Message message = Message.decode(header, messageBytes, 0);
return message;
}
public boolean waitingForBytes(ByteBuf in, int bodySize, int counter) {
if (counter == 0) {
logger.warn("Didn't get enough bytes of message body in MessagDecoder. Giving up and disconnecting from remote peer.");
return false;
}
logger.debug(String.format("Readable bytes in buffer %d, expected %d", in.readableBytes(), bodySize));
if (in.readableBytes() < bodySize) {
try {
Thread.sleep(20L);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
return waitingForBytes(in, bodySize, counter - 1);
} else {
return true;
}
}发布于 2019-09-12 16:50:07
您的代码中存在多个问题...
首先,不允许在代码中调用sync(),因为这样会“死锁”EventLoop。
其次,您不能在这里使用waitingForBytes,因为它基本上会使EventLoop上的所有其他IO失效,这意味着您再也不能继续执行任何IO了。在像Netty这样的框架中,永远不要阻塞EventLoop线程是很重要的,因为这基本上会导致所有东西都变得陈旧,并且不会有任何进展。
发布于 2019-09-12 22:34:29
看看超类ByteToMessageDecoder,很明显,子类应该通过它读取的字节或解码的消息数量来传达解码过程。我认为我继承这段代码的人忽略了这一点。
这个实现通过了一些初始测试:
public class MessageDecoder extends ByteToMessageDecoder {
private static final Logger logger = LoggerFactory.getLogger(MessageDecoder.class);
private ByteBuf headBuf = Unpooled.buffer(MessageHeader.SIZE);
private MessageHeader header = null;
private ByteBuf bodyBuf;
private int bodylength = 0;
private int messageBytes = 0;
private final static int STATE_READ_HEADER = 1, STATE_READ_BODY = 2;
private int state = STATE_READ_HEADER;
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
Message message = decode(ctx, in);
if (message != null) {
out.add(message);
}
}
protected Message decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception {
int readBytes = 0;
logstate(in);
switch (state) {
case STATE_READ_HEADER:
if (in.readableBytes() <= MessageHeader.SIZE - messageBytes) {
readBytes = in.readableBytes();
} else {
readBytes = MessageHeader.SIZE - messageBytes;
}
headBuf.writeBytes(in, readBytes);
messageBytes += readBytes;
if (messageBytes == MessageHeader.SIZE) {
state = STATE_READ_BODY;
header = MessageHeader.decode(headBuf, new ProcessingTracker(MessagePart.HEADER));
bodylength = header.getMessageType().getMessageBodySize();
bodyBuf = Unpooled.buffer(bodylength);
}
break;
case STATE_READ_BODY:
if (in.readableBytes() <= bodylength - (messageBytes - MessageHeader.SIZE)) {
readBytes = in.readableBytes();
} else {
readBytes = bodylength - (messageBytes - MessageHeader.SIZE);
}
bodyBuf.writeBytes(in, readBytes);
messageBytes += readBytes;
if (messageBytes == MessageHeader.SIZE + bodylength) {
state = STATE_READ_HEADER;
Message message = Message.decode(header, bodyBuf, 0);
reset();
return message;
}
break;
}
return null;
}
}https://stackoverflow.com/questions/57890814
复制相似问题