首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >使用SocketAsyncEventArgs处理部分消息

使用SocketAsyncEventArgs处理部分消息
EN

Code Review用户
提问于 2018-05-09 13:42:48
回答 1查看 268关注 0票数 5

正如您所知道的,在处理SocketAsyncEventArgs时,可以接收部分消息,根据所使用的协议,您必须处理它。在我的例子中,这是实时的市场数据,每条消息都用\r\n模式分隔开来。为了减少内存分配,我决定为此目的构建一个处理程序,并希望得到您的反馈。基本上,每次收到数据时,我都会写信给处理程序,并检查是否至少有一条已完成的消息。如果是,我把它推到上层。如果不是,等待第二次传输等等。

处理程序测试

代码语言:javascript
复制
public class MessageHandlerTests
{
    private MessageHandler _messageHandler;

    [SetUp]
    public void SetUp()
    {
        _messageHandler = new MessageHandler(8192, '\n');
    }

    [Test]
    public void TryRead_Should_Return_Positive_Count_After_Receiving_Delimeter_On_First_Write()
    {
        // Arrange
        var msg = "2008-09-30 16:29:56,26.6000,100,104865900,26.6000,26.6100,2836662,0,0,E,\r\n";
        var msgBytes = Encoding.ASCII.GetBytes(msg);

        // Act
        _messageHandler.Write(msgBytes, 0, msgBytes.Length);
        var count = _messageHandler.TryRead(out var readBytes);

        // Assert
        Assert.AreEqual(count, msg.Length);
        Assert.AreEqual(Encoding.ASCII.GetString(readBytes, 0, count), msg);
    }

    [Test]
    public void TryRead_Should_Return_Positive_Count_After_Receiving_Delimeter_On_Second_Write()
    {
        // Arrange
        var msg1 = "2008-09-30 16:29:56,26.6000,100,104865900";
        var msg2 = ",26.6000,26.6100,2836662,0,0,E,\r\n";
        var msg1Bytes = Encoding.ASCII.GetBytes(msg1);
        var msg2Bytes = Encoding.ASCII.GetBytes(msg2);

        // Act
        _messageHandler.Write(msg1Bytes, 0, msg1Bytes.Length);
        _messageHandler.Write(msg2Bytes, 0, msg2Bytes.Length);
        var count = _messageHandler.TryRead(out var readBytes);

        // Assert
        Assert.AreEqual(count, msg1.Length + msg2.Length);
        Assert.AreEqual(Encoding.ASCII.GetString(readBytes, 0, count), msg1 + msg2);
    }

    [Test]
    public void TryRead_Should_Return_Positive_Count_After_Receiving_Delimeter_On_First_Write_With_Remainder()
    {
        // Arrange
        var msg1 = "2008-09-30 16:29:56,26.6000,100,104865900,26.6000,26.6100,2836662,0,0,E,\r\n";
        var msg2 = "2008-09-30 ";
        var msgBytes = Encoding.ASCII.GetBytes(msg1 + msg2);

        // Act
        _messageHandler.Write(msgBytes, 0, msgBytes.Length);
        var count = _messageHandler.TryRead(out var readBytes);

        // Assert
        Assert.AreEqual(count, msg1.Length);
        Assert.AreEqual(Encoding.ASCII.GetString(readBytes, 0, count), msg1);
    }

    [Test]
    public void TryRead_Should_Return_Positive_Count_After_Receiving_Delimeter_On_Second_Write_With_Remainder()
    {
        // Arrange
        var msg1 = "2008-09-30 16:29:56,26.6000,100,104865900";
        var msg2 = ",26.6000,26.6100,2836662,0,0,E,\r\n";
        var msg3 = "2008-09-30 ";
        var msg1Bytes = Encoding.ASCII.GetBytes(msg1);
        var msg2Bytes = Encoding.ASCII.GetBytes(msg2 + msg3);

        // Act
        _messageHandler.Write(msg1Bytes, 0, msg1Bytes.Length);
        _messageHandler.Write(msg2Bytes, 0, msg2Bytes.Length);
        var count = _messageHandler.TryRead(out var readBytes);

        // Assert
        Assert.AreEqual(count, msg1.Length + msg2.Length);
        Assert.AreEqual(Encoding.ASCII.GetString(readBytes, 0, count), msg1 + msg2);
    }

    [Test]
    public void Should_Return_Zero_After_One_Succesful_TryRead()
    {
        // Arrange
        var msg = "2008-09-30 16:29:56,26.6000,100,104865900,26.6000,26.6100,2836662,0,0,E,\r\n";
        var msgBytes = Encoding.ASCII.GetBytes(msg);

        // Act
        byte[] readBytes;
        _messageHandler.Write(msgBytes, 0, msgBytes.Length);
        var count1 = _messageHandler.TryRead(out readBytes);
        var count2 = _messageHandler.TryRead(out readBytes);

        // Assert
        Assert.Greater(count1, 0);
        Assert.AreEqual(count2, 0);
    }
}

处理程序

代码语言:javascript
复制
public class MessageHandler
{
    private readonly char _delimeter;

    private readonly MemoryStream _completeStream;
    private readonly MemoryStream _remainderStream;
    private readonly byte[] _readBytes;

    public MessageHandler(int bufferSize, char delimeter)
    {
        _delimeter = delimeter;

        _completeStream = new MemoryStream(bufferSize);
        _completeStream.Seek(0, SeekOrigin.Begin);

        _remainderStream = new MemoryStream(bufferSize);
        _remainderStream.Seek(0, SeekOrigin.Begin);

        _readBytes = new byte[bufferSize];
    }

    public void Write(byte[] message, int offset, int count)
    {
        // check if delimeter is found
        var delimeterIndex = message.GetLastDelimeterIndex(offset, count, _delimeter);

        // if not found, simply copy bytes into the remainder and return
        if (delimeterIndex == -1)
        {
            _remainderStream.Write(message, offset, count);
            return;
        }

        // if remainder exists, copy bytes into complete
        if (_remainderStream.Position > 0) {
            _remainderStream.WriteTo(_completeStream);
            _remainderStream.SetLength(0);
        }
        // copy received bytes with last delimeter into complete
        _completeStream.Write(message, offset, delimeterIndex + 1);

        // delimeter found at the end of the message
        if (delimeterIndex == count - 1)
            return;

        _remainderStream.Write(message, delimeterIndex + 1, count - delimeterIndex - 1);
    }

    public int TryRead(out byte[] output)
    {
        output = null;
        if (_completeStream.Position == 0)
            return 0;

        var length = (int)_completeStream.Length;
        _completeStream.Position = 0;
        _completeStream.Read(_readBytes, 0, length);
        _completeStream.SetLength(0);
        output = _readBytes;
        return length;
    }
}

public static class ByteExtensions
{
    public static int GetLastDelimeterIndex(this byte[] buffer, int offset, int length, char delimeter)
    {
        for (var i = offset + length - 1; i >= offset; i--)
        {
            if (buffer[i] == delimeter)
                return i;
        }
        return -1;
    }
}
EN

回答 1

Code Review用户

回答已采纳

发布于 2018-06-17 14:43:06

风格好,注释有用,API简单通用.

我希望构造函数中有一个卫士子句,如果我试图误用它,就会提供一个有意义的错误消息。

代码语言:javascript
复制
if (bufferSize <= 0)
    throw new ArgumentOutOfRangeException(nameof(bufferSize), "Buffer Size must be greater than 0");

同样,可以将它们添加到Write中。

从API的角度来看,我希望每次调用TryRead时都会返回一条消息。它没有,这很好,但内联文档(\\\)在解释预期行为时不会出错。

重用_readBytes并将其从TryRead中传递出去,这有点令人吃惊,而且可能有风险。

另一种内存需求较小的设计是将MessageHandler传递给流,并让它在数据进入时写入所述流。这消除了维护缓冲区(实际上是两个缓冲区)的责任,并显着地简化了API。TryRead代码可以独立实现并与任何MemoryStream一起工作。或者,从字节数组构造所述内存流,并将其直接传递出去。

这将对线程产生相当严重的影响,但我认为,考虑到您的评论和代码,arn并不担心这一点。更重要的是,保持流/缓冲区将允许使用者在调用TryRead之前混淆未来的结果(目前,他们只能丢失他们刚刚收到的数据);但是如果内存是一个大问题,那么这会有帮助。

另一种选择(可能适用也可能不适用)是在读取完整消息时调用事件。如果您相信回调不会弄乱它(在一定程度上已经在MemoryStream中这样做了),那么您可以使用单个D10来完成所有这些工作。

GetLastDelimeterIndex并不是特定于Delimeters的;我把它称为更通用的东西,比如LastIndexOfChar (继Array.LastIndexOf)。

您的测试是好的,但是MessageHandler.Write的每个测试都希望它消耗整个数组。这意味着一大块复杂的逻辑实际上是未经测试的,而且正是这种代码导致了难以找到的bug。

..。事实上,有多个相关的bug。

代码语言:javascript
复制
// copy received bytes with last delimeter into complete
_completeStream.Write(message, offset, delimeterIndex + 1); // should be delimeterIndex - offset + 1

if (delimeterIndex == count - 1) // should be count + offset - 1
    return;

 _remainderStream.Write(message, delimeterIndex + 1, count + offset - delimeterIndex - 1); // should be count + offset - delimeterIndex - 1

我在尝试写这个测试时发现了它们:

代码语言:javascript
复制
public void Partial_Writes_Are_Not_Completely_Broken()
{
    // Arrange
    var msg = "fish fish fish\r\nfox fox fox";
    var msgBytes = Encoding.ASCII.GetBytes(msg);
    int startCrop = 5;
    int endCrop = 4;
    int delimiterIndex = msg.LastIndexOf('\n');

    // Act
    byte[] readBytes;
    _messageHandler.Write(msgBytes, startCrop, msgBytes.Length - startCrop - endCrop);
    var count1 = _messageHandler.TryRead(out readBytes);
    _messageHandler.Write(msgBytes, delimiterIndex, 1);
    var count2 = _messageHandler.TryRead(out readBytes);

    // Assert
    Assert.AreEqual(count1, delimiterIndex - startCrop + 1);
    Assert.AreEqual(count2, msg.Length - delimiterIndex - endCrop);
}

您使用的是哪个测试框架?我太懒了,不想尝试计算出来,但是通常在断言调用中,期望值放在第一位,实际值放在第二位。这使我在调试测试时绊倒了。

票数 2
EN
页面原文内容由Code Review提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://codereview.stackexchange.com/questions/194016

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档