首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >使用System.IO.Pipelines和Span<T>的HTTP回波服务器

使用System.IO.Pipelines和Span<T>的HTTP回波服务器
EN

Stack Overflow用户
提问于 2022-03-13 13:43:26
回答 1查看 270关注 0票数 0

我正在努力学习.NET 6.0中的管道。我要做的是简单地实现HTTP/1.1服务器。没有什么稀奇古怪的,可能对大多数现实生活中的情况都不适用,但这并不重要。

为了简单起见,假设我正在实现echo服务器。

抛开解析不谈,真正困扰我的是从NetworkStream读取数据。

我的处理分为几个阶段:

newline).

  • Parsing

  • 通过headers行和标头读取所有行(只要我找到空的

  • 请求来获取主体的大小,就可以读取所有行)。

  • 根据提供的长度和编码头从流中获取身体。

  • 将其作为正确的HTTP响应返回。

我在填写PipeWriter时做错了什么。在某个时候,它挂起网络流的整个读取,并在5秒后返回数据(不包括解析时间,但这是几毫秒的问题)。

当我只是读取和解析来自客户端的所有行(标头等)时,一切都很好,但当我到达需要加载body并回显它的时候,它就会咬我一口。那就是5s出现的地方。

我之所以使用Stream,是因为通过在类中提供MemoryStream并将其与网络堆栈分开测试,可以更容易地完成很少的单元测试。

代码语言:javascript
复制
internal class PipelineStreamReader : IDisposable
{
    private readonly ILogger<PipelineStreamReader> logger;
    private readonly Pipe pipe;
    private readonly Task writing;
    private bool disposed;

    /// <summary>
    /// Create (possibly reusable) pipeline reader.
    /// </summary>
    /// <param name="stream">Most probably duplex NetworkStream, but could be MemoryStream in tests.</param>
    /// <param name="logger">Just logger.</param>
    public PipelineStreamReader(Stream stream, ILogger<PipelineStreamReader> logger)
    {
        this.logger = logger;

        pipe = new Pipe();
        writing = FillPipelineAsync(pipe.Writer, stream);
    }

    public Task<ReadOnlyMemory<char>[]> ReadLines(CancellationToken token)
    {
        return ReadLinesFromPipelineAsync(pipe.Reader, token);
    }
    
    public async ValueTask<ReadOnlyMemory<byte>> ReadBody(int length, ContentType contentType)
    {
        try
        {
            if (contentType.Charset != null)
            {
                return await ReadBodyFromPipelineAsync(pipe.Reader, length, contentType.Charset);
            }

            throw new NotImplementedException();
        }
        catch (ArgumentException e)
        {
            logger.LogError(e, "Incorrect length of body!");
            await pipe.Reader.CompleteAsync();
            await pipe.Writer.CompleteAsync();
            throw;
        }
    }

    private async Task FillPipelineAsync(PipeWriter writer, Stream input)
    {
        while (true)
        {
            // Allocate at least 16 bytes from the PipeWriter, could be more
            var memory = writer.GetMemory(16);
            try
            {
                var bytesRead = await input.ReadAsync(memory);
                if (bytesRead == 0)
                {
                    break;
                }

                // Tell the PipeWriter how much was read from the Socket
                writer.Advance(bytesRead);
            }
            catch (IOException fuck) when (disposed)
            {
                // TODO RK: Something doesn't dispose correctly
                // Not the brightest solution, but hey, it works...
            }
            catch (Exception ex)
            {
                Debug.Fail("Something failed!");
                logger.LogError(ex, "TODO af");
                throw;
            }

            // Make the data available to the PipeReader
            var result = await writer.FlushAsync();

            if (result.IsCompleted)
            {
                break;
            }
        }

        // Tell the PipeReader that there's no more data coming
        await writer.CompleteAsync();
    }

    private static async ValueTask<ReadOnlyMemory<byte>> ReadBodyFromPipelineAsync(PipeReader reader, int length, Encoding encoding)
    {
        var body = new ArrayBufferWriter<byte>();
        var charsRead = 0L;

        while (true)
        {
            var result = await reader.ReadAsync();
            var buffer = result.Buffer;

            ReadBufferUsingEncoding(buffer, body, encoding, ref charsRead);

            // Tell the PipeReader how much of the buffer we have consumed
            reader.AdvanceTo(buffer.Start, buffer.End);

            // There shouldn't be more bytes to read than what was provided in headers
            if (charsRead > length)
                throw new ArgumentException("Too much data");

            // Stop reading if there's no more data coming
            if (charsRead == length)
            {
                break;
            }

            if (result.IsCompleted)
            {
                break;
            }
        }

        // Check characters count
        if (charsRead < length)
            throw new ArgumentException("Wrong length");

        return body.WrittenMemory;
    }

    private static void ReadBufferUsingEncoding(
        ReadOnlySequence<byte> seq,
        IBufferWriter<byte> buffer,
        Encoding encoding,
        ref long charactersCount)
    {
        var segment = seq.Start;

        while (seq.TryGet(ref segment, out var memory))
        {
            buffer.Write(memory.Span);
            charactersCount += encoding.GetCharCount(memory.Span);
        }
    }

    private static async Task<ReadOnlyMemory<char>[]> ReadLinesFromPipelineAsync(PipeReader reader, CancellationToken token)
    {
        var list = new List<ReadOnlyMemory<char>>(16);
        var endOfLines = false;

        while (!endOfLines)
        {
            // Wait for data
            var read = await reader.ReadAsync(token);

            // Save to temporary buffer for reading lines
            var currentBuffer = read.Buffer;

            // check for end or completed stream (error in this case)
            if (read.Buffer.IsEmpty && read.IsCompleted)
                throw new InvalidOperationException("That shouldn't happen");

            SequencePosition? consumed = null;

            while (true)
            {
                // check if there is newline
                var newLine = currentBuffer.PositionOf((byte) '\n');

                // No newline, so let writer drop more data into buffers
                if (newLine == null) break;

                // we have newline, but let's check for windows carriage
                var carriage = currentBuffer.PositionOf((byte) '\r');
                var pos = carriage ?? newLine.Value;
                var line = ProcessLine(currentBuffer.Slice(0, pos));

                // advance current buffer for next line read and update consumed data
                var offset = carriage is null ? 1 : 2;
                var newPos = currentBuffer.GetPosition(offset, pos);
                currentBuffer = currentBuffer.Slice(newPos);

                // check if it's last line and update position to remove last newline
                if (line.Length == 0)
                {
                    consumed = read.Buffer.GetPosition(0, newPos);
                    endOfLines = true;
                    break;
                }

                // otherwise add line and update consumed data normally
                list.Add(line);
                consumed = read.Buffer.GetPosition(0, pos);
            }

            // clear consumed data
            reader.AdvanceTo(consumed ?? read.Buffer.Start, read.Buffer.End);
        }

        return list.ToArray();
    }

    private static ReadOnlyMemory<char> ProcessLine(ReadOnlySequence<byte> seq)
    {
        var buffer = new ArrayBufferWriter<char>();
        var segment = seq.Start;

        while (seq.TryGet(ref segment, out var memory))
        {
            Encoding.ASCII.GetChars(memory.Span, buffer);
        }

        return buffer.WrittenMemory;
    }

    protected virtual void Dispose(bool disposing)
    {
        if (disposed) return;

        if (!disposing) return;

        pipe.Reader.Complete();
        pipe.Writer.Complete();

        disposed = true;
    }

    public void Dispose()
    {
        Dispose(true);
        GC.SuppressFinalize(this);
    }
}

更新

抱歉,我忘了问问题了。就像我说的,问题是我的代码每次被请求时都会延迟5秒。解析需要5秒的延迟+几毫秒的时间。

我不知道原因是什么,我做错了什么。这似乎是网络堆栈上的东西在咬我,但我不知道。

EN

回答 1

Stack Overflow用户

发布于 2022-03-14 18:19:36

好吧,看了其他密码后我找到了解决办法。

我仍然不知道在这么长时间内集中数据的确切原因是什么,但是在FillPipeline方法中添加简单的检查数据可用性解决了我的问题:

代码语言:javascript
复制
private async Task FillPipelineAsync(PipeWriter writer, Stream input)
{
    while (true)
    {
        // Allocate at least 16 bytes from the PipeWriter, could be more
        var memory = writer.GetMemory(16);
        try
        {
            // escape if no data is available
            if (!FirstRead && input is NetworkStream {DataAvailable: false})
                break;

            var bytesRead = await input.ReadAsync(memory);
            if (bytesRead == 0)
                break;

            // Tell the PipeWriter how much was read from the Socket
            writer.Advance(bytesRead);
        }
        catch (IOException fuck) when (disposed)
        {
            logger.LogError(fuck, "fuck");
            // TODO RK: Something doesn't dispose correctly
        }
        catch (Exception ex)
        {
            Debug.Fail("Something failed!");
            logger.LogError(ex, "TODO af");
            throw;
        }

        // Make the data available to the PipeReader
        var result = await writer.FlushAsync();

        if (result.IsCompleted)
        {
            break;
        }
    }

    // Tell the PipeReader that there's no more data coming
    await writer.CompleteAsync();
}
票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/71457343

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档