首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >Netty ByteToMessageDecoder未接收以不同tcp数据包发送的消息

Netty ByteToMessageDecoder未接收以不同tcp数据包发送的消息
EN

Stack Overflow用户
提问于 2019-09-11 21:58:15
回答 2查看 274关注 0票数 1

自定义ByteToMessageEncoder不接收在同一tcp连接中发送的字节,而是在不同tcp消息中发送的字节。

我被指派去解决一个问题,这个问题是几年前的旧系统开始出现问题的。在我看来,有一个由其他开发人员用netty编写的tcp服务器,它接收具有静态长度报头和可变长度主体的二进制消息。正文长度由告知消息类型的报头字段定义。我们维护一个messagetype及其长度的映射。

所面临的问题是,在正确解码报头并知道主体长度之后,相同的解码器期望主体出现在相同的ByteBuf中(即来自byteChannel的一个fireChannelRead事件)。

然而,有时缓冲区中没有足够的内容,所以解码器放弃了。但是下一次调用decode-method时,主体字节会出现,并被错误地解释为头部,从而使解码器不同步。

使用netty组装消息的正确方式是什么,因为消息的字节可能会以较小的块插入?

以下是当前解码器的基础知识。

代码语言:javascript
复制
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
        Message message = decode(ctx, in); 
        if (message != null) {
            out.add(message);
        }
    }

    protected Message decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception {
        if (in.readableBytes() < MessageHeader.SIZE) {
            return null;
        }
        ByteBuf headerBytes = in.readBytes(MessageHeader.SIZE);
        MessageHeader header = MessageHeader.decode(headerBytes, new ProcessingTracker(MessagePart.HEADER));
        if (header == null) {
            ctx.disconnect().sync();
            logger.debug("Disconnected from channel");
            return null;
        }
        int bodySize = header.getMessageType().getMessageBodySize();
        if (!waitingForBytes(in, bodySize, READ_TRY_TIMES)) {
            ctx.disconnect().sync();
            logger.debug("Disconnected from channel");
            return null;
        }
        ByteBuf messageBytes = in.readBytes(bodySize);
        messageBytes.resetReaderIndex();
        Message message = Message.decode(header, messageBytes, 0);
        return message;
    }


    public boolean waitingForBytes(ByteBuf in, int bodySize, int counter) {
        if (counter == 0) {
            logger.warn("Didn't get enough bytes of message body in MessagDecoder. Giving up and disconnecting from remote peer.");
            return false;
        }
        logger.debug(String.format("Readable bytes in buffer %d, expected %d", in.readableBytes(), bodySize));

        if (in.readableBytes() < bodySize) {

            try {
                Thread.sleep(20L);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
            return waitingForBytes(in, bodySize, counter - 1);
        } else {
            return true;
        }
    }
EN

回答 2

Stack Overflow用户

发布于 2019-09-12 16:50:07

您的代码中存在多个问题...

首先,不允许在代码中调用sync(),因为这样会“死锁”EventLoop

其次,您不能在这里使用waitingForBytes,因为它基本上会使EventLoop上的所有其他IO失效,这意味着您再也不能继续执行任何IO了。在像Netty这样的框架中,永远不要阻塞EventLoop线程是很重要的,因为这基本上会导致所有东西都变得陈旧,并且不会有任何进展。

票数 0
EN

Stack Overflow用户

发布于 2019-09-12 22:34:29

看看超类ByteToMessageDecoder,很明显,子类应该通过它读取的字节或解码的消息数量来传达解码过程。我认为我继承这段代码的人忽略了这一点。

这个实现通过了一些初始测试:

代码语言:javascript
复制
public class MessageDecoder extends ByteToMessageDecoder {

    private static final Logger logger = LoggerFactory.getLogger(MessageDecoder.class);

    private ByteBuf headBuf = Unpooled.buffer(MessageHeader.SIZE);

    private MessageHeader header = null;

    private ByteBuf bodyBuf;

    private int bodylength = 0;

    private int messageBytes = 0;

    private final static int STATE_READ_HEADER = 1, STATE_READ_BODY = 2;

    private int state = STATE_READ_HEADER;

    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
        Message message = decode(ctx, in);
        if (message != null) {
            out.add(message);
        }
    }

    protected Message decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception {
        int readBytes = 0;
        logstate(in);
        switch (state) {
        case STATE_READ_HEADER:
            if (in.readableBytes() <= MessageHeader.SIZE - messageBytes) {
                readBytes = in.readableBytes();
            } else {
                readBytes = MessageHeader.SIZE - messageBytes;
            }
            headBuf.writeBytes(in, readBytes);
            messageBytes += readBytes;
            if (messageBytes == MessageHeader.SIZE) {
                state = STATE_READ_BODY;
                header = MessageHeader.decode(headBuf, new ProcessingTracker(MessagePart.HEADER));
                bodylength = header.getMessageType().getMessageBodySize();
                bodyBuf = Unpooled.buffer(bodylength);
            }
            break;
        case STATE_READ_BODY:
            if (in.readableBytes() <= bodylength - (messageBytes - MessageHeader.SIZE)) {
                readBytes = in.readableBytes();
            } else {
                readBytes = bodylength - (messageBytes - MessageHeader.SIZE);
            }
            bodyBuf.writeBytes(in, readBytes);
            messageBytes += readBytes;
            if (messageBytes == MessageHeader.SIZE + bodylength) {
                state = STATE_READ_HEADER;
                Message message = Message.decode(header, bodyBuf, 0);
                reset();
                return message;
            }
            break;
        }
        return null;
    }
}
票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/57890814

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档