首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >如何在C#中使用异步技巧来处理IO和CPU混合绑定的任务?

如何在C#中使用异步技巧来处理IO和CPU混合绑定的任务?
EN

Stack Overflow用户
提问于 2020-05-06 15:28:57
回答 2查看 115关注 0票数 1

这个问题的背景是从一个流(IO绑定)读取数据,处理数据(CPU绑定),然后写入另一个流(IO绑定)。

天真的方式是这样的

代码语言:javascript
复制
thread 1: loop { |<--read data block from stream-->|<--process data block-->|<--write to stream-->| }

天真的生产者-消费者模式

代码语言:javascript
复制
thread 1: loop { |<--read data block from stream-->| enqueue data block to blocking queue A }

thread 2: loop { dequeue data block from blocking queue A |<--process data block-->| enqueue data block to blocking queue B }

thread 3: loop { dequeue data block from blocking queue B |<--write to stream-->| }

一个流示例如下

代码语言:javascript
复制
var hasher = MD5.Create();
using (FileStream readStream = new FileStream("filePath", FileMode.Open))
using (BufferedStream readBs = new BufferedStream(readStream ))
using (CryptoStream md5HashStream = new CryptoStream(readBs, hasher, CryptoStreamMode.Read))
using (FileStream writeStream= File.OpenWrite("destPath"))
using (BufferedStream writeBs = new BufferedStream(writeStream))
{
    md5HashStream.CopyTo(writeBs);
}

如何使用C#异步技巧,如异步流、通道、数据流,将上述流样本转换为生产者-客户模式,以减少阻塞io时间?

EN

回答 2

Stack Overflow用户

发布于 2020-05-07 11:05:22

你应该使用微软的Reactive Framework (又名Rx) -- NuGet System.Reactive并添加using System.Reactive.Linq; --然后你就可以这样做了:

代码语言:javascript
复制
var query =
    Observable.Using(() => new FileStream(@"filePath", FileMode.Open), readStream =>
    Observable.Using(() => new BufferedStream(readStream), readBs =>
    Observable.Using(() => MD5.Create(), hasher =>
    Observable.Using(() => new CryptoStream(readBs, hasher, CryptoStreamMode.Read), md5HashStream =>
    Observable.Using(() => File.OpenWrite(@"destPath"), writeStream => Observable.Using(() => new BufferedStream(writeStream), writeBs =>
    Observable.FromAsync(() => md5HashStream.CopyToAsync(writeBs))))))));

query.Wait(); // or await query;

我得到的结果总是比你的原始代码快2到5倍。

票数 0
EN

Stack Overflow用户

发布于 2020-05-06 16:20:20

您可以使用streams的ReadAsyncWriteAsync方法来等待io操作,并将一些固定的blockSize字节数读入缓冲区。但是,由于ReadAsync可能会读取所需的较少字节,因此您需要确保使用循环读取blockSize字节。

代码语言:javascript
复制
int blockSize = 1024;

using (FileStream readStream = new FileStream("filePath", FileMode.Open))
using (BufferedStream readBs = new BufferedStream(readStream ))
using (FileStream writeStream = File.OpenWrite("destPath"))
using (BufferedStream writeBs = new BufferedStream(writeStream))
{
    int offset;
    var buffer = new byte[blockSize];
    do {
        offset = 0;
        while (offset < buffer.Length) 
        { // make sure to read blockSize bytes
            var bytesRead = await readBs.ReadAsync(buffer, offset, buffer.Length - offset);
            if (bytesRead == 0) break;
            offset += bytesRead;
        }

        if (offset > 0) 
        {
            var result = DoSomethingWithData(buffer, offset); // assumtion: retuns a new byte[] with only relevant data

            await writeBs.WriteAsync(result, 0, result.Length);
        }
    } while (0 < offset);
}
票数 -1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/61629637

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档