我正在努力学习.NET 6.0中的管道。我要做的是简单地实现HTTP/1.1服务器。没有什么稀奇古怪的,可能对大多数现实生活中的情况都不适用,但这并不重要。
为了简单起见,假设我正在实现echo服务器。
抛开解析不谈,真正困扰我的是从NetworkStream读取数据。
我的处理分为几个阶段:
newline).
我在填写PipeWriter时做错了什么。在某个时候,它挂起网络流的整个读取,并在5秒后返回数据(不包括解析时间,但这是几毫秒的问题)。
当我只是读取和解析来自客户端的所有行(标头等)时,一切都很好,但当我到达需要加载body并回显它的时候,它就会咬我一口。那就是5s出现的地方。
我之所以使用Stream,是因为通过在类中提供MemoryStream并将其与网络堆栈分开测试,可以更容易地完成很少的单元测试。
internal class PipelineStreamReader : IDisposable
{
private readonly ILogger<PipelineStreamReader> logger;
private readonly Pipe pipe;
private readonly Task writing;
private bool disposed;
/// <summary>
/// Create (possibly reusable) pipeline reader.
/// </summary>
/// <param name="stream">Most probably duplex NetworkStream, but could be MemoryStream in tests.</param>
/// <param name="logger">Just logger.</param>
public PipelineStreamReader(Stream stream, ILogger<PipelineStreamReader> logger)
{
this.logger = logger;
pipe = new Pipe();
writing = FillPipelineAsync(pipe.Writer, stream);
}
public Task<ReadOnlyMemory<char>[]> ReadLines(CancellationToken token)
{
return ReadLinesFromPipelineAsync(pipe.Reader, token);
}
public async ValueTask<ReadOnlyMemory<byte>> ReadBody(int length, ContentType contentType)
{
try
{
if (contentType.Charset != null)
{
return await ReadBodyFromPipelineAsync(pipe.Reader, length, contentType.Charset);
}
throw new NotImplementedException();
}
catch (ArgumentException e)
{
logger.LogError(e, "Incorrect length of body!");
await pipe.Reader.CompleteAsync();
await pipe.Writer.CompleteAsync();
throw;
}
}
private async Task FillPipelineAsync(PipeWriter writer, Stream input)
{
while (true)
{
// Allocate at least 16 bytes from the PipeWriter, could be more
var memory = writer.GetMemory(16);
try
{
var bytesRead = await input.ReadAsync(memory);
if (bytesRead == 0)
{
break;
}
// Tell the PipeWriter how much was read from the Socket
writer.Advance(bytesRead);
}
catch (IOException fuck) when (disposed)
{
// TODO RK: Something doesn't dispose correctly
// Not the brightest solution, but hey, it works...
}
catch (Exception ex)
{
Debug.Fail("Something failed!");
logger.LogError(ex, "TODO af");
throw;
}
// Make the data available to the PipeReader
var result = await writer.FlushAsync();
if (result.IsCompleted)
{
break;
}
}
// Tell the PipeReader that there's no more data coming
await writer.CompleteAsync();
}
private static async ValueTask<ReadOnlyMemory<byte>> ReadBodyFromPipelineAsync(PipeReader reader, int length, Encoding encoding)
{
var body = new ArrayBufferWriter<byte>();
var charsRead = 0L;
while (true)
{
var result = await reader.ReadAsync();
var buffer = result.Buffer;
ReadBufferUsingEncoding(buffer, body, encoding, ref charsRead);
// Tell the PipeReader how much of the buffer we have consumed
reader.AdvanceTo(buffer.Start, buffer.End);
// There shouldn't be more bytes to read than what was provided in headers
if (charsRead > length)
throw new ArgumentException("Too much data");
// Stop reading if there's no more data coming
if (charsRead == length)
{
break;
}
if (result.IsCompleted)
{
break;
}
}
// Check characters count
if (charsRead < length)
throw new ArgumentException("Wrong length");
return body.WrittenMemory;
}
private static void ReadBufferUsingEncoding(
ReadOnlySequence<byte> seq,
IBufferWriter<byte> buffer,
Encoding encoding,
ref long charactersCount)
{
var segment = seq.Start;
while (seq.TryGet(ref segment, out var memory))
{
buffer.Write(memory.Span);
charactersCount += encoding.GetCharCount(memory.Span);
}
}
private static async Task<ReadOnlyMemory<char>[]> ReadLinesFromPipelineAsync(PipeReader reader, CancellationToken token)
{
var list = new List<ReadOnlyMemory<char>>(16);
var endOfLines = false;
while (!endOfLines)
{
// Wait for data
var read = await reader.ReadAsync(token);
// Save to temporary buffer for reading lines
var currentBuffer = read.Buffer;
// check for end or completed stream (error in this case)
if (read.Buffer.IsEmpty && read.IsCompleted)
throw new InvalidOperationException("That shouldn't happen");
SequencePosition? consumed = null;
while (true)
{
// check if there is newline
var newLine = currentBuffer.PositionOf((byte) '\n');
// No newline, so let writer drop more data into buffers
if (newLine == null) break;
// we have newline, but let's check for windows carriage
var carriage = currentBuffer.PositionOf((byte) '\r');
var pos = carriage ?? newLine.Value;
var line = ProcessLine(currentBuffer.Slice(0, pos));
// advance current buffer for next line read and update consumed data
var offset = carriage is null ? 1 : 2;
var newPos = currentBuffer.GetPosition(offset, pos);
currentBuffer = currentBuffer.Slice(newPos);
// check if it's last line and update position to remove last newline
if (line.Length == 0)
{
consumed = read.Buffer.GetPosition(0, newPos);
endOfLines = true;
break;
}
// otherwise add line and update consumed data normally
list.Add(line);
consumed = read.Buffer.GetPosition(0, pos);
}
// clear consumed data
reader.AdvanceTo(consumed ?? read.Buffer.Start, read.Buffer.End);
}
return list.ToArray();
}
private static ReadOnlyMemory<char> ProcessLine(ReadOnlySequence<byte> seq)
{
var buffer = new ArrayBufferWriter<char>();
var segment = seq.Start;
while (seq.TryGet(ref segment, out var memory))
{
Encoding.ASCII.GetChars(memory.Span, buffer);
}
return buffer.WrittenMemory;
}
protected virtual void Dispose(bool disposing)
{
if (disposed) return;
if (!disposing) return;
pipe.Reader.Complete();
pipe.Writer.Complete();
disposed = true;
}
public void Dispose()
{
Dispose(true);
GC.SuppressFinalize(this);
}
}更新
抱歉,我忘了问问题了。就像我说的,问题是我的代码每次被请求时都会延迟5秒。解析需要5秒的延迟+几毫秒的时间。
我不知道原因是什么,我做错了什么。这似乎是网络堆栈上的东西在咬我,但我不知道。
发布于 2022-03-14 18:19:36
好吧,看了其他密码后我找到了解决办法。
我仍然不知道在这么长时间内集中数据的确切原因是什么,但是在FillPipeline方法中添加简单的检查数据可用性解决了我的问题:
private async Task FillPipelineAsync(PipeWriter writer, Stream input)
{
while (true)
{
// Allocate at least 16 bytes from the PipeWriter, could be more
var memory = writer.GetMemory(16);
try
{
// escape if no data is available
if (!FirstRead && input is NetworkStream {DataAvailable: false})
break;
var bytesRead = await input.ReadAsync(memory);
if (bytesRead == 0)
break;
// Tell the PipeWriter how much was read from the Socket
writer.Advance(bytesRead);
}
catch (IOException fuck) when (disposed)
{
logger.LogError(fuck, "fuck");
// TODO RK: Something doesn't dispose correctly
}
catch (Exception ex)
{
Debug.Fail("Something failed!");
logger.LogError(ex, "TODO af");
throw;
}
// Make the data available to the PipeReader
var result = await writer.FlushAsync();
if (result.IsCompleted)
{
break;
}
}
// Tell the PipeReader that there's no more data coming
await writer.CompleteAsync();
}https://stackoverflow.com/questions/71457343
复制相似问题