首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >什么是处理基于线路的网络I/O流的好方法?

什么是处理基于线路的网络I/O流的好方法?
EN

Stack Overflow用户
提问于 2009-02-07 23:31:33
回答 2查看 1.1K关注 0票数 6

注:请允许我为这个问题的长度道歉,我不得不在里面放了很多信息。我希望这不会导致太多的人简单地略读并做出假设。请阅读全文。谢谢。

我有一个通过套接字传入的数据流。这些数据是面向行的。

我使用的是.NET (BeginRead等)的APM (异步编程方法)。这排除了使用基于流的I/O,因为异步I/O是基于缓冲区的。可以重新打包数据并将其发送到流,例如内存流,但也存在问题。

问题是我的输入流(我不能控制它)没有给我任何关于流有多长的信息。它只是一个换行流,看起来像这样:

代码语言:javascript
复制
COMMAND\n
...Unpredictable number of lines of data...\n
END COMMAND\n
....repeat....

因此,使用APM,因为我不知道任何给定的数据集将有多长,所以很可能数据块将跨越缓冲区边界,需要多次读取,但这些多次读取也将跨越多个数据块。

示例:

代码语言:javascript
复制
Byte buffer[1024] = ".................blah\nThis is another l"
[another read]
                    "ine\n.............................More Lines..."

我的第一个想法是使用StringBuilder并简单地将缓冲行附加到SB。这在某种程度上是有效的,但我发现很难提取数据块。我尝试使用StringReader读取新行数据,但无法知道是否获得了完整的行,因为StringReader在添加的最后一个块的末尾返回部分行,然后返回null。没有办法知道返回的是不是整行换行的数据。

示例:

代码语言:javascript
复制
// Note: no newline at the end
StringBuilder sb = new StringBuilder("This is a line\nThis is incomp..");
StringReader sr = new StringReader(sb);
string s = sr.ReadLine(); // returns "This is a line"
s = sr.ReadLine();        // returns "This is incomp.."

更糟糕的是,如果我只是继续追加数据,缓冲区会变得越来越大,因为这可能会一次运行几周或几个月,这不是一个好的解决方案。

我的下一个想法是在读取数据块时从SB中删除它们。这需要编写我自己的ReadLine函数,但是在读写过程中我不得不锁定数据。此外,较大的数据块(可能包括数百次读取和兆字节数据)需要扫描整个缓冲区以查找新行。它效率不高,而且相当难看。

我正在寻找的东西,具有简单的StreamReader/Writer与方便的异步I/O。

我的下一个想法是使用MemoryStream,并将数据块写入内存流,然后将StreamReader附加到流并使用ReadLine,但我再次遇到了了解缓冲区中的最后一次读取是否是完整行的问题,而且从流中删除“陈旧”数据更加困难。

我还考虑过使用同步读取的线程。这样做的好处是,使用StreamReader时,它总是从ReadLine()返回整行,除非在连接中断的情况下。但是,这会导致取消连接的问题,并且某些类型的网络问题可能会导致套接字长时间挂起阻塞。我之所以使用异步IO,是因为我不想让线程在程序的生命周期中阻塞数据接收。

这种联系是持久的。随着时间的推移,数据将继续流动。在初始连接期间,有大量的数据流,一旦数据流完成,套接字将保持打开状态,等待实时更新。我不知道初始流什么时候“结束”,因为知道的唯一方法就是不再立即发送数据。这意味着我不能在处理之前等待初始数据加载完成,当它到来时,我几乎被“实时”处理。

那么,有没有人能建议一种好的方法,以一种不太复杂的方式来处理这种情况?我真的希望这尽可能简单和优雅,但由于所有的边缘情况,我不断提出越来越复杂的解决方案。我猜我想要的是某种FIFO,在这种FIFO中,我可以很容易地添加更多的数据,同时从其中取出符合特定条件的数据(例如,以换行符结尾的字符串)。

EN

回答 2

Stack Overflow用户

回答已采纳

发布于 2009-02-08 01:33:22

这是一个相当有趣的问题。我过去的解决方案是使用一个独立的线程进行同步操作,正如您所建议的那样。(我设法解决了使用锁和大量异常处理程序阻塞套接字的大多数问题。)尽管如此,使用内置异步操作通常是明智的,因为它允许真正的OS级异步I/O,所以我理解您的观点。

好吧,我已经写了一个类来完成我认为你需要的东西(我会说是以一种相对干净的方式)。让我知道你的想法。

代码语言:javascript
复制
using System;
using System.Collections.Generic;
using System.IO;
using System.Text;

public class AsyncStreamProcessor : IDisposable
{
    protected StringBuilder _buffer;  // Buffer for unprocessed data.

    private bool _isDisposed = false; // True if object has been disposed

    public AsyncStreamProcessor()
    {
        _buffer = null;
    }

    public IEnumerable<string> Process(byte[] newData)
    {
        // Note: replace the following encoding method with whatever you are reading.
        // The trick here is to add an extra line break to the new data so that the algorithm recognises
        // a single line break at the end of the new data.
        using(var newDataReader = new StringReader(Encoding.ASCII.GetString(newData) + Environment.NewLine))
        {
            // Read all lines from new data, returning all but the last.
            // The last line is guaranteed to be incomplete (or possibly complete except for the line break,
            // which will be processed with the next packet of data).
            string line, prevLine = null;
            while ((line = newDataReader.ReadLine()) != null)
            {
                if (prevLine != null)
                {
                    yield return (_buffer == null ? string.Empty : _buffer.ToString()) + prevLine;
                    _buffer = null;
                }
                prevLine = line;
            }

            // Store last incomplete line in buffer.
            if (_buffer == null)
                // Note: the (* 2) gives you the prediction of the length of the incomplete line, 
                // so that the buffer does not have to be expanded in most/all situations. 
                // Change it to whatever seems appropiate.
                _buffer = new StringBuilder(prevLine, prevLine.Length * 2);
            else
                _buffer.Append(prevLine);
        }
    }

    public void Dispose()
    {
        Dispose(true);
        GC.SuppressFinalize(this);
    }

    private void Dispose(bool disposing)
    {
        if (!_isDisposed)
        {
            if (disposing)
            {
                // Dispose managed resources.
                _buffer = null;
                GC.Collect();
            }

            // Dispose native resources.

            // Remember that object has been disposed.
            _isDisposed = true;
        }
    }
}

应该为每个NetworkStream创建一个该类的实例,并且应该在接收到新数据时调用流程函数(在BeginRead的回调方法中,在调用下一个BeginRead之前)。

注意:我只用测试数据验证了这个代码,并没有通过网络传输的实际数据。然而,我预计不会有任何不同。

另外,警告说这个类当然不是线程安全的,但是只要在处理完当前数据之前不会再次执行BeginRead (正如我假设您正在做的那样),应该不会有任何问题。

希望这对你有用。如果还有问题,请让我知道,我会尝试修改解决方案来处理它们。(尽管我仔细阅读了这个问题,但我很可能忽略了其中的一些微妙之处!)

票数 5
EN

Stack Overflow用户

发布于 2009-02-08 01:13:52

你在问题中的解释让我想起了ASCIZ字符串。(link text)。这可能是一个有益的开始。

在大学里,我不得不为我正在做的一个项目写一些类似的东西。不幸的是,我控制了发送套接字,所以我插入了一段消息字段作为协议的一部分。但是,我认为类似的方法可能会使您受益。

我的解决方案是这样的:我会发送类似5HELLO的东西,所以首先我会看到5,并且知道我的消息长度是5,因此我需要的消息是5个字符。但是,如果在异步读取时,我只能读取5HE,我将看到消息长度为5,但我只能读取3个字节(假设是ASCII字符)。正因为如此,我知道我遗漏了一些字节,并将我拥有的存储在片段缓冲区中。我为每个套接字设置了一个片段缓冲区,从而避免了任何同步问题。粗略的过程是。

  1. 从套接字读取到一个字节数组,记录多少字节被读取
  2. 逐字节扫描,直到你找到一个换行符(如果你没有接收到ascii字符,但字符可以是多个字节,这将变得非常复杂,你可以自己处理)
  3. 将你的frag buffer转换成一个字符串,并追加你的读取缓冲区,直到新行到达它。将此字符串作为已完成的消息放到一个队列或它自己的委托中进行处理。(你可以通过让你的读套接字写到与你的片段相同的字节数组来优化这些缓冲区,但这很难进行explain)
  4. Continue循环,每次我们找到一个新行,从记录的开始/结束位置从字节排列创建一个字符串,并放在队列/委托上对于processing.
  5. Once,我们到达了读缓冲区的末尾,将剩下的任何东西复制到frag缓冲区中。
  6. 调用套接字上的BeginRead,这将跳转到步骤1。当套接字中有数据时。

然后你使用另一个线程来读取你的非通信消息队列,或者只是让线程池使用委托来处理它。做任何你必须做的数据处理。如果我错了,有人会纠正我,但这样的线程同步问题很少,因为你只能在任何时候从套接字读取或等待读取,所以不用担心锁(除非你在填充队列,我在我的实现中使用了委托)。有几个细节你需要自己解决,比如留下多大的frag缓冲区,如果你在读的时候收到0个换行符,整个消息必须附加到片段缓冲区中,而不能覆盖任何内容。我想它最后运行了我大约700 - 800行代码,但这包括连接设置的东西,加密的协商,以及其他一些事情。

这个设置对我来说执行得非常好;我能够在100 80Mbps的以太网lan上执行高达80 80Mbps的操作,使用这个实现的1.8 was,包括加密处理。由于您绑定到套接字,因此服务器将进行扩展,因为可以同时处理多个套接字。如果您需要按顺序处理项目,则需要使用队列,但如果顺序并不重要,那么委托将在线程池之外为您提供非常可伸缩的性能。

希望这会有所帮助,并不意味着是一个完整的解决方案,而是一个开始寻找的方向。

*只需注意,我的实现纯粹是字节级的,并且支持加密,我在示例中使用了字符,以使其更容易可视化。

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/524818

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档