首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >在返回带有取消的IAsyncEnumerable的函数中迭代IAsyncEnumerable

在返回带有取消的IAsyncEnumerable的函数中迭代IAsyncEnumerable
EN

Stack Overflow用户
提问于 2019-11-07 22:34:58
回答 2查看 2.7K关注 0票数 4

正如标题所述,我必须履行以下职能:

代码语言:javascript
复制
public async IAsyncEnumerable<Job> GetByPipeline(int pipelineId,
    [EnumeratorCancellation] CancellationToken cancellationToken = default)
{
    await foreach (var job in context.Jobs.Where(job => job.Pipeline.Id == pipelineId)
        .AsAsyncEnumerable()
        .WithCancellation(cancellationToken)
        .ConfigureAwait(false))
    {
        yield return job;
    }
}

我很难把我的头绕到取消令牌要去的地方,还有一种唠叨的感觉,觉得我在很多地方都使用它。

当你解构所有花哨的异步东西时,这里到底发生了什么?还有更好的方法来编写这个函数吗?

EN

回答 2

Stack Overflow用户

发布于 2019-11-08 10:36:03

首先,可以将此方法简化为:

代码语言:javascript
复制
public IAsyncEnumerable<Job> GetByPipeline(int pipelineId)
{
    return context.Jobs
                  .Where(job => job.Pipeline.Id == pipelineId)
                  .AsAsyncEnumerable();
}

甚至是

代码语言:javascript
复制
public IAsyncEnumerable<Job> GetByPipeline(int pipelineId)
    => context.Jobs
              .Where(job => job.Pipeline.Id == pipelineId)
              .AsAsyncEnumerable();

该方法不需要对job进行任何操作,因此不需要对其进行迭代。

取消

如果方法实际使用了job,那么应该在哪里使用取消令牌呢?

让我们把这个方法搞清楚一点。相当于:

代码语言:javascript
复制
public async IAsyncEnumerable<Job> GetByPipeline(
      int pipelineId, 
      [EnumeratorCancellation] CancellationToken ct = default)
{
    //Just a query, doesn't execute anything
    var query =context.Jobs.Where(job => job.Pipeline.Id == pipelineId);

    //Executes the query and returns the *results* as soon as they arrive in an async stream
    var jobStream=query.AsAsyncEnumerable();

    //Process the results from the async stream as they arrive
    await foreach (var job in jobStream.WithCancellation(ct).ConfigureAwait(false))
    {
        //Does *that* need cancelling?
        DoSometingExpensive(job);
    }
}

IQueryable query不运行任何东西,它表示查询。不需要取消。

AsAsyncEnumerable()AsEnumerable()ToList()等执行查询并返回一些结果。ToList()等使用所有的结果,而As...Enumerable()方法只在请求时生成结果。查询不能取消,除非请求,否则As_Enumerable()方法不会返回任何内容,因此它们不需要取消。

await foreach将遍历整个异步流,因此如果我们想要中止它,我们确实需要传递取消令牌。

最后,DoSometingExpensive(job);需要取消吗?是不是太贵了,如果要花太长时间,我们就想摆脱它吗?或者我们能等到它完成后才退出循环吗?如果它需要取消,它也将需要CancellationToken。

ConfigureAwait

最后,ConfigureAwait(false)不涉及取消,可能根本不需要。没有它,在每次await执行返回到原始同步上下文之后。在桌面应用程序中,这意味着UI线程。这就是允许我们在异步事件处理程序中修改UI的原因。

如果GetByPipeline运行在桌面应用程序上并希望修改UI,则必须删除ConfugureAwait

代码语言:javascript
复制
await foreach (var job in jobStream.WithCancellation(ct))
{
        //Update the UI
        toolStripProgressBar.Increment(1);
        toolStripStatusLabel.Text=job.Name;
        //Do the actual job
        DoSometingExpensive(job);
}

使用ConfigureAwait(false),在线程池线程上继续执行,我们不能访问UI。

库代码不应影响执行恢复的方式,因此大多数库都使用ConfigureAwait(false),并将最终决定留给UI开发人员。

如果GetByPipeline是库方法,请使用ConfigureAwait(false)

票数 0
EN

Stack Overflow用户

发布于 2019-11-08 14:15:24

假设在实体框架的深处有一个方法GetJobs,它从数据库中检索Job对象:

代码语言:javascript
复制
private static async IAsyncEnumerable<Job> GetJobs(DbDataReader dataReader,
    [EnumeratorCancellation]CancellationToken cancellationToken = default)
{
    while (await dataReader.ReadAsync(cancellationToken))
    {
        yield return new Job()
        {
            Id = (int)dataReader["Id"],
            Data = (byte[])dataReader["Data"]
        };
    }
}

现在假设Data属性包含一个巨大的字节数组,其中的数据与Job相关联。检索每个Job的数组可能需要一些非平凡的时间。在这种情况下,中断循环迭代之间是不够的,因为在调用Cancel方法和引发OperationCanceledException之间会有明显的延迟。这就是为什么DbDataReader.ReadAsync方法需要一个CancellationToken,这样查询就可以立即取消。

现在的挑战是如何将客户端代码传递的CancellationToken传递给GetJobs方法,而像context.Jobs这样的属性就在此过程中。解决方案是WithCancellation扩展方法,它存储令牌并将其更深地传递给接受带有EnumeratorCancellation属性修饰的参数的方法。

所以在你的例子中,你做的一切都是正确的。您已经在您的cancellationToken返回方法中包含了一个IAsyncEnumerable参数,这是推荐的做法。这样,连接到您的WithCancellation方法的后续GetByPipeline就不会浪费。然后将WithCancellation链接到方法中的AsAsyncEnumerable之后,这也是正确的。否则,CancellationToken将无法到达其最终目的地,即GetJobs方法。

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/58757843

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档