正如标题所述,我必须履行以下职能:
public async IAsyncEnumerable<Job> GetByPipeline(int pipelineId,
[EnumeratorCancellation] CancellationToken cancellationToken = default)
{
await foreach (var job in context.Jobs.Where(job => job.Pipeline.Id == pipelineId)
.AsAsyncEnumerable()
.WithCancellation(cancellationToken)
.ConfigureAwait(false))
{
yield return job;
}
}我很难把我的头绕到取消令牌要去的地方,还有一种唠叨的感觉,觉得我在很多地方都使用它。
当你解构所有花哨的异步东西时,这里到底发生了什么?还有更好的方法来编写这个函数吗?
发布于 2019-11-08 10:36:03
首先,可以将此方法简化为:
public IAsyncEnumerable<Job> GetByPipeline(int pipelineId)
{
return context.Jobs
.Where(job => job.Pipeline.Id == pipelineId)
.AsAsyncEnumerable();
}甚至是
public IAsyncEnumerable<Job> GetByPipeline(int pipelineId)
=> context.Jobs
.Where(job => job.Pipeline.Id == pipelineId)
.AsAsyncEnumerable();该方法不需要对job进行任何操作,因此不需要对其进行迭代。
取消
如果方法实际使用了job,那么应该在哪里使用取消令牌呢?
让我们把这个方法搞清楚一点。相当于:
public async IAsyncEnumerable<Job> GetByPipeline(
int pipelineId,
[EnumeratorCancellation] CancellationToken ct = default)
{
//Just a query, doesn't execute anything
var query =context.Jobs.Where(job => job.Pipeline.Id == pipelineId);
//Executes the query and returns the *results* as soon as they arrive in an async stream
var jobStream=query.AsAsyncEnumerable();
//Process the results from the async stream as they arrive
await foreach (var job in jobStream.WithCancellation(ct).ConfigureAwait(false))
{
//Does *that* need cancelling?
DoSometingExpensive(job);
}
}IQueryable query不运行任何东西,它表示查询。不需要取消。
AsAsyncEnumerable()、AsEnumerable()、ToList()等执行查询并返回一些结果。ToList()等使用所有的结果,而As...Enumerable()方法只在请求时生成结果。查询不能取消,除非请求,否则As_Enumerable()方法不会返回任何内容,因此它们不需要取消。
await foreach将遍历整个异步流,因此如果我们想要中止它,我们确实需要传递取消令牌。
最后,DoSometingExpensive(job);需要取消吗?是不是太贵了,如果要花太长时间,我们就想摆脱它吗?或者我们能等到它完成后才退出循环吗?如果它需要取消,它也将需要CancellationToken。
ConfigureAwait
最后,ConfigureAwait(false)不涉及取消,可能根本不需要。没有它,在每次await执行返回到原始同步上下文之后。在桌面应用程序中,这意味着UI线程。这就是允许我们在异步事件处理程序中修改UI的原因。
如果GetByPipeline运行在桌面应用程序上并希望修改UI,则必须删除ConfugureAwait:
await foreach (var job in jobStream.WithCancellation(ct))
{
//Update the UI
toolStripProgressBar.Increment(1);
toolStripStatusLabel.Text=job.Name;
//Do the actual job
DoSometingExpensive(job);
}使用ConfigureAwait(false),在线程池线程上继续执行,我们不能访问UI。
库代码不应影响执行恢复的方式,因此大多数库都使用ConfigureAwait(false),并将最终决定留给UI开发人员。
如果GetByPipeline是库方法,请使用ConfigureAwait(false)。
发布于 2019-11-08 14:15:24
假设在实体框架的深处有一个方法GetJobs,它从数据库中检索Job对象:
private static async IAsyncEnumerable<Job> GetJobs(DbDataReader dataReader,
[EnumeratorCancellation]CancellationToken cancellationToken = default)
{
while (await dataReader.ReadAsync(cancellationToken))
{
yield return new Job()
{
Id = (int)dataReader["Id"],
Data = (byte[])dataReader["Data"]
};
}
}现在假设Data属性包含一个巨大的字节数组,其中的数据与Job相关联。检索每个Job的数组可能需要一些非平凡的时间。在这种情况下,中断循环在迭代之间是不够的,因为在调用Cancel方法和引发OperationCanceledException之间会有明显的延迟。这就是为什么DbDataReader.ReadAsync方法需要一个CancellationToken,这样查询就可以立即取消。
现在的挑战是如何将客户端代码传递的CancellationToken传递给GetJobs方法,而像context.Jobs这样的属性就在此过程中。解决方案是WithCancellation扩展方法,它存储令牌并将其更深地传递给接受带有EnumeratorCancellation属性修饰的参数的方法。
所以在你的例子中,你做的一切都是正确的。您已经在您的cancellationToken返回方法中包含了一个IAsyncEnumerable参数,这是推荐的做法。这样,连接到您的WithCancellation方法的后续GetByPipeline就不会浪费。然后将WithCancellation链接到方法中的AsAsyncEnumerable之后,这也是正确的。否则,CancellationToken将无法到达其最终目的地,即GetJobs方法。
https://stackoverflow.com/questions/58757843
复制相似问题