首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >是否可以在Parallel.ForEachAsync 6.0中节流以避免速率限制?

是否可以在Parallel.ForEachAsync 6.0中节流以避免速率限制?
EN

Stack Overflow用户
提问于 2021-12-09 14:31:11
回答 1查看 618关注 0票数 2

我对编程相当陌生(< 3年的经验),所以我对这篇文章中的主题没有很好的理解。请容忍我。

我的团队正在开发与第三方系统的集成,而第三方的一个端点缺乏一种有意义的方法来获取符合条件的实体列表。

我们一直通过循环遍历请求集合来获取这些实体,并将每个等待调用的结果添加到一个列表中。这很好,但是获取实体比从其他端点获取实体要长得多,这些端点允许我们通过提供ids列表来获取实体列表。

.NET 6.0引入了Parallel.ForEachAsync(),它允许我们并行地异步执行多个可访问的任务。

例如:

代码语言:javascript
复制
public async Task<List<TEntity>> GetEntitiesInParallelAsync<TEntity>(List<IRestRequest> requests) 
where TEntity : IEntity
{
    var entities = new ConcurrentBag<TEntity>();

    // Create a function that takes a RestRequest and returns the 
    // result of the request's execution, for each request
    var requestExecutionTasks = requests.Select(i => 
        new Func<Task<TEntity>>(() => GetAsync<TEntity>(i)));

    // Execute each of the functions asynchronously in parallel, 
    // and add the results to the aggregate as they come in
    await Parallel.ForEachAsync(requestExecutionTasks, new ParallelOptions
    {
        // This lets us limit the number of threads to use. -1 is unlimited
        MaxDegreeOfParallelism = -1 
    }, async (func, _) => entities.Add(await func()));

    return entities.ToList();
}

使用此代码而不是简单的foreach-循环会加快获得测试实例上的~30个实体所需的时间,平均速度为91%。太棒了。然而,我们担心的是,当我们在可能有数千个实体的客户端系统上使用它时,可能会出现速率限制。我们已经有了一个系统,可以从他们的API中检测到"you is rate limited"-message,并在再次尝试之前提示请求一秒钟左右,但这并不是一个好的解决方案,因为它是一种安全措施。

如果我们只遍历请求,就可以通过在循环的每一次迭代中执行类似await Task.Delay(minimumDelay)的操作来抑制调用。如果我错了,请纠正我,但据我所知,在并行执行请求时,这实际上是行不通的,因为它会使所有请求在执行之前等待相同的时间。是否有办法让每个人的请求在执行前等待一定的时间,只有当我们接近于利率限制的时候?如果可能的话,我想在不限制要使用的线程数量的情况下这样做。

编辑

我想让这个问题稍微放一放,这样更多的人可以回答。由于没有新的答案或评论已经添加,我是标记一个我得到的答案是正确的。话虽如此,答案似乎表明了一种与使用Parallel.ForEachAsync不同的方法。

如果我正确地理解了当前的答案,我最初提出的关于是否有可能节流Parallel.ForEachAsync的问题的答案是:“不,它不是”。

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2021-12-09 16:18:13

我的建议是放弃Parallel.ForEachAsync方法,而使用新的Chunk LINQ运算符与Task.WhenAll方法相结合。您可以每秒钟启动100个异步操作,如下所示:

代码语言:javascript
复制
public async Task<List<TEntity>> GetEntitiesInParallelAsync<TEntity>(
    List<IRestRequest> requests) where TEntity : IEntity
{
    var tasks = new List<Task<TEntity>>();
    foreach (var chunk in requests.Chunk(100))
    {
        tasks.AddRange(chunk.Select(request => GetAsync<TEntity>(request)));
        await Task.Delay(TimeSpan.FromSeconds(1.0));
    }
    return (await Task.WhenAll(tasks)).ToList();
}

假设启动异步操作(调用GetAsync方法)所需的时间可以忽略不计。

这种方法固有的缺点是,在出现异常时,在完成所有操作之前不会传播故障。作为比较,Parallel.ForEachAsync方法在检测到第一个故障后停止调用异步委托并尽快完成。

票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/70291671

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档