正如您所知道的,在处理SocketAsyncEventArgs时,可以接收部分消息,根据所使用的协议,您必须处理它。在我的例子中,这是实时的市场数据,每条消息都用\r\n模式分隔开来。为了减少内存分配,我决定为此目的构建一个处理程序,并希望得到您的反馈。基本上,每次收到数据时,我都会写信给处理程序,并检查是否至少有一条已完成的消息。如果是,我把它推到上层。如果不是,等待第二次传输等等。
处理程序测试
public class MessageHandlerTests
{
private MessageHandler _messageHandler;
[SetUp]
public void SetUp()
{
_messageHandler = new MessageHandler(8192, '\n');
}
[Test]
public void TryRead_Should_Return_Positive_Count_After_Receiving_Delimeter_On_First_Write()
{
// Arrange
var msg = "2008-09-30 16:29:56,26.6000,100,104865900,26.6000,26.6100,2836662,0,0,E,\r\n";
var msgBytes = Encoding.ASCII.GetBytes(msg);
// Act
_messageHandler.Write(msgBytes, 0, msgBytes.Length);
var count = _messageHandler.TryRead(out var readBytes);
// Assert
Assert.AreEqual(count, msg.Length);
Assert.AreEqual(Encoding.ASCII.GetString(readBytes, 0, count), msg);
}
[Test]
public void TryRead_Should_Return_Positive_Count_After_Receiving_Delimeter_On_Second_Write()
{
// Arrange
var msg1 = "2008-09-30 16:29:56,26.6000,100,104865900";
var msg2 = ",26.6000,26.6100,2836662,0,0,E,\r\n";
var msg1Bytes = Encoding.ASCII.GetBytes(msg1);
var msg2Bytes = Encoding.ASCII.GetBytes(msg2);
// Act
_messageHandler.Write(msg1Bytes, 0, msg1Bytes.Length);
_messageHandler.Write(msg2Bytes, 0, msg2Bytes.Length);
var count = _messageHandler.TryRead(out var readBytes);
// Assert
Assert.AreEqual(count, msg1.Length + msg2.Length);
Assert.AreEqual(Encoding.ASCII.GetString(readBytes, 0, count), msg1 + msg2);
}
[Test]
public void TryRead_Should_Return_Positive_Count_After_Receiving_Delimeter_On_First_Write_With_Remainder()
{
// Arrange
var msg1 = "2008-09-30 16:29:56,26.6000,100,104865900,26.6000,26.6100,2836662,0,0,E,\r\n";
var msg2 = "2008-09-30 ";
var msgBytes = Encoding.ASCII.GetBytes(msg1 + msg2);
// Act
_messageHandler.Write(msgBytes, 0, msgBytes.Length);
var count = _messageHandler.TryRead(out var readBytes);
// Assert
Assert.AreEqual(count, msg1.Length);
Assert.AreEqual(Encoding.ASCII.GetString(readBytes, 0, count), msg1);
}
[Test]
public void TryRead_Should_Return_Positive_Count_After_Receiving_Delimeter_On_Second_Write_With_Remainder()
{
// Arrange
var msg1 = "2008-09-30 16:29:56,26.6000,100,104865900";
var msg2 = ",26.6000,26.6100,2836662,0,0,E,\r\n";
var msg3 = "2008-09-30 ";
var msg1Bytes = Encoding.ASCII.GetBytes(msg1);
var msg2Bytes = Encoding.ASCII.GetBytes(msg2 + msg3);
// Act
_messageHandler.Write(msg1Bytes, 0, msg1Bytes.Length);
_messageHandler.Write(msg2Bytes, 0, msg2Bytes.Length);
var count = _messageHandler.TryRead(out var readBytes);
// Assert
Assert.AreEqual(count, msg1.Length + msg2.Length);
Assert.AreEqual(Encoding.ASCII.GetString(readBytes, 0, count), msg1 + msg2);
}
[Test]
public void Should_Return_Zero_After_One_Succesful_TryRead()
{
// Arrange
var msg = "2008-09-30 16:29:56,26.6000,100,104865900,26.6000,26.6100,2836662,0,0,E,\r\n";
var msgBytes = Encoding.ASCII.GetBytes(msg);
// Act
byte[] readBytes;
_messageHandler.Write(msgBytes, 0, msgBytes.Length);
var count1 = _messageHandler.TryRead(out readBytes);
var count2 = _messageHandler.TryRead(out readBytes);
// Assert
Assert.Greater(count1, 0);
Assert.AreEqual(count2, 0);
}
}处理程序
public class MessageHandler
{
private readonly char _delimeter;
private readonly MemoryStream _completeStream;
private readonly MemoryStream _remainderStream;
private readonly byte[] _readBytes;
public MessageHandler(int bufferSize, char delimeter)
{
_delimeter = delimeter;
_completeStream = new MemoryStream(bufferSize);
_completeStream.Seek(0, SeekOrigin.Begin);
_remainderStream = new MemoryStream(bufferSize);
_remainderStream.Seek(0, SeekOrigin.Begin);
_readBytes = new byte[bufferSize];
}
public void Write(byte[] message, int offset, int count)
{
// check if delimeter is found
var delimeterIndex = message.GetLastDelimeterIndex(offset, count, _delimeter);
// if not found, simply copy bytes into the remainder and return
if (delimeterIndex == -1)
{
_remainderStream.Write(message, offset, count);
return;
}
// if remainder exists, copy bytes into complete
if (_remainderStream.Position > 0) {
_remainderStream.WriteTo(_completeStream);
_remainderStream.SetLength(0);
}
// copy received bytes with last delimeter into complete
_completeStream.Write(message, offset, delimeterIndex + 1);
// delimeter found at the end of the message
if (delimeterIndex == count - 1)
return;
_remainderStream.Write(message, delimeterIndex + 1, count - delimeterIndex - 1);
}
public int TryRead(out byte[] output)
{
output = null;
if (_completeStream.Position == 0)
return 0;
var length = (int)_completeStream.Length;
_completeStream.Position = 0;
_completeStream.Read(_readBytes, 0, length);
_completeStream.SetLength(0);
output = _readBytes;
return length;
}
}
public static class ByteExtensions
{
public static int GetLastDelimeterIndex(this byte[] buffer, int offset, int length, char delimeter)
{
for (var i = offset + length - 1; i >= offset; i--)
{
if (buffer[i] == delimeter)
return i;
}
return -1;
}
}发布于 2018-06-17 14:43:06
风格好,注释有用,API简单通用.
我希望构造函数中有一个卫士子句,如果我试图误用它,就会提供一个有意义的错误消息。
if (bufferSize <= 0)
throw new ArgumentOutOfRangeException(nameof(bufferSize), "Buffer Size must be greater than 0");同样,可以将它们添加到Write中。
从API的角度来看,我希望每次调用TryRead时都会返回一条消息。它没有,这很好,但内联文档(\\\)在解释预期行为时不会出错。
重用_readBytes并将其从TryRead中传递出去,这有点令人吃惊,而且可能有风险。
另一种内存需求较小的设计是将MessageHandler传递给流,并让它在数据进入时写入所述流。这消除了维护缓冲区(实际上是两个缓冲区)的责任,并显着地简化了API。TryRead代码可以独立实现并与任何MemoryStream一起工作。或者,从字节数组构造所述内存流,并将其直接传递出去。
这将对线程产生相当严重的影响,但我认为,考虑到您的评论和代码,arn并不担心这一点。更重要的是,保持流/缓冲区将允许使用者在调用TryRead之前混淆未来的结果(目前,他们只能丢失他们刚刚收到的数据);但是如果内存是一个大问题,那么这会有帮助。
另一种选择(可能适用也可能不适用)是在读取完整消息时调用事件。如果您相信回调不会弄乱它(在一定程度上已经在MemoryStream中这样做了),那么您可以使用单个D10来完成所有这些工作。
GetLastDelimeterIndex并不是特定于Delimeters的;我把它称为更通用的东西,比如LastIndexOfChar (继Array.LastIndexOf)。
您的测试是好的,但是MessageHandler.Write的每个测试都希望它消耗整个数组。这意味着一大块复杂的逻辑实际上是未经测试的,而且正是这种代码导致了难以找到的bug。
..。事实上,有多个相关的bug。
// copy received bytes with last delimeter into complete
_completeStream.Write(message, offset, delimeterIndex + 1); // should be delimeterIndex - offset + 1
if (delimeterIndex == count - 1) // should be count + offset - 1
return;
_remainderStream.Write(message, delimeterIndex + 1, count + offset - delimeterIndex - 1); // should be count + offset - delimeterIndex - 1我在尝试写这个测试时发现了它们:
public void Partial_Writes_Are_Not_Completely_Broken()
{
// Arrange
var msg = "fish fish fish\r\nfox fox fox";
var msgBytes = Encoding.ASCII.GetBytes(msg);
int startCrop = 5;
int endCrop = 4;
int delimiterIndex = msg.LastIndexOf('\n');
// Act
byte[] readBytes;
_messageHandler.Write(msgBytes, startCrop, msgBytes.Length - startCrop - endCrop);
var count1 = _messageHandler.TryRead(out readBytes);
_messageHandler.Write(msgBytes, delimiterIndex, 1);
var count2 = _messageHandler.TryRead(out readBytes);
// Assert
Assert.AreEqual(count1, delimiterIndex - startCrop + 1);
Assert.AreEqual(count2, msg.Length - delimiterIndex - endCrop);
}您使用的是哪个测试框架?我太懒了,不想尝试计算出来,但是通常在断言调用中,期望值放在第一位,实际值放在第二位。这使我在调试测试时绊倒了。
https://codereview.stackexchange.com/questions/194016
复制相似问题