注:请允许我为这个问题的长度道歉,我不得不在里面放了很多信息。我希望这不会导致太多的人简单地略读并做出假设。请阅读全文。谢谢。
我有一个通过套接字传入的数据流。这些数据是面向行的。
我使用的是.NET (BeginRead等)的APM (异步编程方法)。这排除了使用基于流的I/O,因为异步I/O是基于缓冲区的。可以重新打包数据并将其发送到流,例如内存流,但也存在问题。
问题是我的输入流(我不能控制它)没有给我任何关于流有多长的信息。它只是一个换行流,看起来像这样:
COMMAND\n
...Unpredictable number of lines of data...\n
END COMMAND\n
....repeat....因此,使用APM,因为我不知道任何给定的数据集将有多长,所以很可能数据块将跨越缓冲区边界,需要多次读取,但这些多次读取也将跨越多个数据块。
示例:
Byte buffer[1024] = ".................blah\nThis is another l"
[another read]
"ine\n.............................More Lines..."我的第一个想法是使用StringBuilder并简单地将缓冲行附加到SB。这在某种程度上是有效的,但我发现很难提取数据块。我尝试使用StringReader读取新行数据,但无法知道是否获得了完整的行,因为StringReader在添加的最后一个块的末尾返回部分行,然后返回null。没有办法知道返回的是不是整行换行的数据。
示例:
// Note: no newline at the end
StringBuilder sb = new StringBuilder("This is a line\nThis is incomp..");
StringReader sr = new StringReader(sb);
string s = sr.ReadLine(); // returns "This is a line"
s = sr.ReadLine(); // returns "This is incomp.."更糟糕的是,如果我只是继续追加数据,缓冲区会变得越来越大,因为这可能会一次运行几周或几个月,这不是一个好的解决方案。
我的下一个想法是在读取数据块时从SB中删除它们。这需要编写我自己的ReadLine函数,但是在读写过程中我不得不锁定数据。此外,较大的数据块(可能包括数百次读取和兆字节数据)需要扫描整个缓冲区以查找新行。它效率不高,而且相当难看。
我正在寻找的东西,具有简单的StreamReader/Writer与方便的异步I/O。
我的下一个想法是使用MemoryStream,并将数据块写入内存流,然后将StreamReader附加到流并使用ReadLine,但我再次遇到了了解缓冲区中的最后一次读取是否是完整行的问题,而且从流中删除“陈旧”数据更加困难。
我还考虑过使用同步读取的线程。这样做的好处是,使用StreamReader时,它总是从ReadLine()返回整行,除非在连接中断的情况下。但是,这会导致取消连接的问题,并且某些类型的网络问题可能会导致套接字长时间挂起阻塞。我之所以使用异步IO,是因为我不想让线程在程序的生命周期中阻塞数据接收。
这种联系是持久的。随着时间的推移,数据将继续流动。在初始连接期间,有大量的数据流,一旦数据流完成,套接字将保持打开状态,等待实时更新。我不知道初始流什么时候“结束”,因为知道的唯一方法就是不再立即发送数据。这意味着我不能在处理之前等待初始数据加载完成,当它到来时,我几乎被“实时”处理。
那么,有没有人能建议一种好的方法,以一种不太复杂的方式来处理这种情况?我真的希望这尽可能简单和优雅,但由于所有的边缘情况,我不断提出越来越复杂的解决方案。我猜我想要的是某种FIFO,在这种FIFO中,我可以很容易地添加更多的数据,同时从其中取出符合特定条件的数据(例如,以换行符结尾的字符串)。
发布于 2009-02-08 01:33:22
这是一个相当有趣的问题。我过去的解决方案是使用一个独立的线程进行同步操作,正如您所建议的那样。(我设法解决了使用锁和大量异常处理程序阻塞套接字的大多数问题。)尽管如此,使用内置异步操作通常是明智的,因为它允许真正的OS级异步I/O,所以我理解您的观点。
好吧,我已经写了一个类来完成我认为你需要的东西(我会说是以一种相对干净的方式)。让我知道你的想法。
using System;
using System.Collections.Generic;
using System.IO;
using System.Text;
public class AsyncStreamProcessor : IDisposable
{
protected StringBuilder _buffer; // Buffer for unprocessed data.
private bool _isDisposed = false; // True if object has been disposed
public AsyncStreamProcessor()
{
_buffer = null;
}
public IEnumerable<string> Process(byte[] newData)
{
// Note: replace the following encoding method with whatever you are reading.
// The trick here is to add an extra line break to the new data so that the algorithm recognises
// a single line break at the end of the new data.
using(var newDataReader = new StringReader(Encoding.ASCII.GetString(newData) + Environment.NewLine))
{
// Read all lines from new data, returning all but the last.
// The last line is guaranteed to be incomplete (or possibly complete except for the line break,
// which will be processed with the next packet of data).
string line, prevLine = null;
while ((line = newDataReader.ReadLine()) != null)
{
if (prevLine != null)
{
yield return (_buffer == null ? string.Empty : _buffer.ToString()) + prevLine;
_buffer = null;
}
prevLine = line;
}
// Store last incomplete line in buffer.
if (_buffer == null)
// Note: the (* 2) gives you the prediction of the length of the incomplete line,
// so that the buffer does not have to be expanded in most/all situations.
// Change it to whatever seems appropiate.
_buffer = new StringBuilder(prevLine, prevLine.Length * 2);
else
_buffer.Append(prevLine);
}
}
public void Dispose()
{
Dispose(true);
GC.SuppressFinalize(this);
}
private void Dispose(bool disposing)
{
if (!_isDisposed)
{
if (disposing)
{
// Dispose managed resources.
_buffer = null;
GC.Collect();
}
// Dispose native resources.
// Remember that object has been disposed.
_isDisposed = true;
}
}
}应该为每个NetworkStream创建一个该类的实例,并且应该在接收到新数据时调用流程函数(在BeginRead的回调方法中,在调用下一个BeginRead之前)。
注意:我只用测试数据验证了这个代码,并没有通过网络传输的实际数据。然而,我预计不会有任何不同。
另外,警告说这个类当然不是线程安全的,但是只要在处理完当前数据之前不会再次执行BeginRead (正如我假设您正在做的那样),应该不会有任何问题。
希望这对你有用。如果还有问题,请让我知道,我会尝试修改解决方案来处理它们。(尽管我仔细阅读了这个问题,但我很可能忽略了其中的一些微妙之处!)
发布于 2009-02-08 01:13:52
你在问题中的解释让我想起了ASCIZ字符串。(link text)。这可能是一个有益的开始。
在大学里,我不得不为我正在做的一个项目写一些类似的东西。不幸的是,我控制了发送套接字,所以我插入了一段消息字段作为协议的一部分。但是,我认为类似的方法可能会使您受益。
我的解决方案是这样的:我会发送类似5HELLO的东西,所以首先我会看到5,并且知道我的消息长度是5,因此我需要的消息是5个字符。但是,如果在异步读取时,我只能读取5HE,我将看到消息长度为5,但我只能读取3个字节(假设是ASCII字符)。正因为如此,我知道我遗漏了一些字节,并将我拥有的存储在片段缓冲区中。我为每个套接字设置了一个片段缓冲区,从而避免了任何同步问题。粗略的过程是。
然后你使用另一个线程来读取你的非通信消息队列,或者只是让线程池使用委托来处理它。做任何你必须做的数据处理。如果我错了,有人会纠正我,但这样的线程同步问题很少,因为你只能在任何时候从套接字读取或等待读取,所以不用担心锁(除非你在填充队列,我在我的实现中使用了委托)。有几个细节你需要自己解决,比如留下多大的frag缓冲区,如果你在读的时候收到0个换行符,整个消息必须附加到片段缓冲区中,而不能覆盖任何内容。我想它最后运行了我大约700 - 800行代码,但这包括连接设置的东西,加密的协商,以及其他一些事情。
这个设置对我来说执行得非常好;我能够在100 80Mbps的以太网lan上执行高达80 80Mbps的操作,使用这个实现的1.8 was,包括加密处理。由于您绑定到套接字,因此服务器将进行扩展,因为可以同时处理多个套接字。如果您需要按顺序处理项目,则需要使用队列,但如果顺序并不重要,那么委托将在线程池之外为您提供非常可伸缩的性能。
希望这会有所帮助,并不意味着是一个完整的解决方案,而是一个开始寻找的方向。
*只需注意,我的实现纯粹是字节级的,并且支持加密,我在示例中使用了字符,以使其更容易可视化。
https://stackoverflow.com/questions/524818
复制相似问题