首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >关于并行运行多个异步任务的说明

关于并行运行多个异步任务的说明
EN

Stack Overflow用户
提问于 2019-04-10 22:37:18
回答 1查看 2.4K关注 0票数 3

编辑:由于舱壁策略需要与WaitAndRetry策略包装,无论如何……我倾向于将示例3作为保持并行性、节流和polly策略重试的最佳解决方案。只是有点奇怪,因为我认为Parallel.ForEach是用于同步操作的,而舱壁则更适合异步操作。

我试图使用polly AsyncBulkheadPolicy并行运行多个异步任务。到目前为止,我的理解是,策略方法ExecuteAsync本身并没有对线程进行调用,而是将其留给默认的TaskScheduler或它之前的某个人。因此,如果我的任务在某种程度上是CPU绑定的,那么在执行任务时需要使用Parallel.ForEach,或者用ExecuteAsync方法执行Task.Run(),以便将任务调度到后台线程。

有人能看看下面的例子并澄清它们在并行性和线程池方面的工作方式吗?

https://github.com/App-vNext/Polly/wiki/Bulkhead -操作:舱壁策略不会创建它自己的线程,它假设我们已经这样做了。

代码语言:javascript
复制
async Task DoSomething(IEnumerable<object> objects);

//Example 1:
//Simple use, but then I don't have access to retry policies from polly
Parallel.ForEach(groupedObjects, (set) =>
{
    var task = DoSomething(set);
    task.Wait();
});

//Example 2:
//Uses default TaskScheduler which may or may not run the tasks in parallel
var parallelTasks = new List<Task>();
foreach (var set in groupedObjects)
{
    var task = bulkheadPolicy.ExecuteAsync(async () => DoSomething(set));
    parallelTasks.Add(task);
};

await Task.WhenAll(parallelTasks);

//Example 3:
//seems to defeat the purpose of the bulkhead since Parallel.ForEach and
//PolicyBulkheadAsync can both do throttling...just use basic RetryPolicy
//here? 
Parallel.ForEach(groupedObjects, (set) =>
{
    var task = bulkheadPolicy.ExecuteAsync(async () => DoSomething(set));
    task.Wait();
});


//Example 4:
//Task.Run still uses the default Task scheduler and isn't any different than
//Example 2; just makes more tasks...this is my understanding.
var parallelTasks = new List<Task>();
foreach (var set in groupedObjects)
{
    var task = Task.Run(async () => await bulkheadPolicy.ExecuteAsync(async () => DoSomething(set)));
    parallelTasks.Add(task);
};

await Task.WhenAll(parallelTasks);

DoSomething是对一组对象执行操作的异步方法。我希望这种情况发生在并行线程中,同时尊重polly的重试策略并允许节流。

然而,在任务/线程的处理方式方面,我似乎对Parallel.ForEach和使用Bulkhead.ExecuteAsync的具体功能行为感到困惑。

EN

回答 1

Stack Overflow用户

发布于 2019-04-11 04:03:21

您可能是对的,使用Parallel.ForEach违背了舱壁的目的。我认为,一个简单的循环与延迟将完成的工作,使舱壁的任务。虽然我猜在实际的例子中会有一个连续的数据流,而不是预定义的列表或数组。

代码语言:javascript
复制
using Polly;
using Polly.Bulkhead;

static async Task Main(string[] args)
{
    var groupedObjects = Enumerable.Range(0, 10)
        .Select(n => new object[] { n }); // Create 10 sets to work with
    var bulkheadPolicy = Policy
        .BulkheadAsync(3, 3); // maxParallelization, maxQueuingActions
    var parallelTasks = new List<Task>();
    foreach (var set in groupedObjects)
    {
        Console.WriteLine(@$"Scheduling, Available: {bulkheadPolicy
            .BulkheadAvailableCount}, QueueAvailable: {bulkheadPolicy
            .QueueAvailableCount}");

        // Start the task
        var task = bulkheadPolicy.ExecuteAsync(async () =>
        {
            // Await the task without capturing the context
            await DoSomethingAsync(set).ConfigureAwait(false);
        });
        parallelTasks.Add(task);
        await Task.Delay(50); // Interval between scheduling more tasks
    }

    var whenAllTasks = Task.WhenAll(parallelTasks);
    try
    {
        // Await all the tasks (await throws only one of the exceptions)
        await whenAllTasks;
    }
    catch when (whenAllTasks.IsFaulted) // It might also be canceled
    {
        // Ignore rejections, rethrow other exceptions
        whenAllTasks.Exception.Handle(ex => ex is BulkheadRejectedException);
    }
    Console.WriteLine(@$"Processed: {parallelTasks
        .Where(t => t.Status == TaskStatus.RanToCompletion).Count()}");
    Console.WriteLine($"Faulted: {parallelTasks.Where(t => t.IsFaulted).Count()}");
}

static async Task DoSomethingAsync(IEnumerable<object> set)
{
    // Pretend we are doing something with the set
    await Task.Delay(500).ConfigureAwait(false);
}

输出:

代码语言:javascript
复制
Scheduling, Available: 3, QueueAvailable: 3
Scheduling, Available: 2, QueueAvailable: 3
Scheduling, Available: 1, QueueAvailable: 3
Scheduling, Available: 0, QueueAvailable: 3
Scheduling, Available: 0, QueueAvailable: 2
Scheduling, Available: 0, QueueAvailable: 1
Scheduling, Available: 0, QueueAvailable: 0
Scheduling, Available: 0, QueueAvailable: 0
Scheduling, Available: 0, QueueAvailable: 0
Scheduling, Available: 0, QueueAvailable: 1
Processed: 7
Faulted: 3

在Fiddle上试试

更新:是一个稍微逼真的DoSomethingAsync版本,它实际上迫使CPU做一些实际的工作(在我的四核计算机中,CPU利用率接近100% )。

代码语言:javascript
复制
private static async Task DoSomethingAsync(IEnumerable<object> objects)
{
    await Task.Run(() =>
    {
        long sum = 0; for (int i = 0; i < 500000000; i++) sum += i;
    }).ConfigureAwait(false);
}

此方法并不是针对所有数据集运行的。它只为没有被舱壁拒绝的设置运行。

票数 2
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/55622225

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档