这个问题的背景是从一个流(IO绑定)读取数据,处理数据(CPU绑定),然后写入另一个流(IO绑定)。
天真的方式是这样的
thread 1: loop { |<--read data block from stream-->|<--process data block-->|<--write to stream-->| }天真的生产者-消费者模式
thread 1: loop { |<--read data block from stream-->| enqueue data block to blocking queue A }
thread 2: loop { dequeue data block from blocking queue A |<--process data block-->| enqueue data block to blocking queue B }
thread 3: loop { dequeue data block from blocking queue B |<--write to stream-->| }一个流示例如下
var hasher = MD5.Create();
using (FileStream readStream = new FileStream("filePath", FileMode.Open))
using (BufferedStream readBs = new BufferedStream(readStream ))
using (CryptoStream md5HashStream = new CryptoStream(readBs, hasher, CryptoStreamMode.Read))
using (FileStream writeStream= File.OpenWrite("destPath"))
using (BufferedStream writeBs = new BufferedStream(writeStream))
{
md5HashStream.CopyTo(writeBs);
}如何使用C#异步技巧,如异步流、通道、数据流,将上述流样本转换为生产者-客户模式,以减少阻塞io时间?
发布于 2020-05-07 11:05:22
你应该使用微软的Reactive Framework (又名Rx) -- NuGet System.Reactive并添加using System.Reactive.Linq; --然后你就可以这样做了:
var query =
Observable.Using(() => new FileStream(@"filePath", FileMode.Open), readStream =>
Observable.Using(() => new BufferedStream(readStream), readBs =>
Observable.Using(() => MD5.Create(), hasher =>
Observable.Using(() => new CryptoStream(readBs, hasher, CryptoStreamMode.Read), md5HashStream =>
Observable.Using(() => File.OpenWrite(@"destPath"), writeStream => Observable.Using(() => new BufferedStream(writeStream), writeBs =>
Observable.FromAsync(() => md5HashStream.CopyToAsync(writeBs))))))));
query.Wait(); // or await query;我得到的结果总是比你的原始代码快2到5倍。
发布于 2020-05-06 16:20:20
您可以使用streams的ReadAsync和WriteAsync方法来等待io操作,并将一些固定的blockSize字节数读入缓冲区。但是,由于ReadAsync可能会读取所需的较少字节,因此您需要确保使用循环读取blockSize字节。
int blockSize = 1024;
using (FileStream readStream = new FileStream("filePath", FileMode.Open))
using (BufferedStream readBs = new BufferedStream(readStream ))
using (FileStream writeStream = File.OpenWrite("destPath"))
using (BufferedStream writeBs = new BufferedStream(writeStream))
{
int offset;
var buffer = new byte[blockSize];
do {
offset = 0;
while (offset < buffer.Length)
{ // make sure to read blockSize bytes
var bytesRead = await readBs.ReadAsync(buffer, offset, buffer.Length - offset);
if (bytesRead == 0) break;
offset += bytesRead;
}
if (offset > 0)
{
var result = DoSomethingWithData(buffer, offset); // assumtion: retuns a new byte[] with only relevant data
await writeBs.WriteAsync(result, 0, result.Length);
}
} while (0 < offset);
}https://stackoverflow.com/questions/61629637
复制相似问题