我想用一个外循环和一个内环进行计算,我可以并行地做。此外,我想使用基于异步/等待的编程模型。在外部循环中,有一个需要资源的地方,只能由一个线程使用。
我考虑使用ForEachAsync实现循环,并使用SemaphoreSlim限制对资源的访问。
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
public class Program {
private static Dictionary<int,IReadOnlyList<int>> resource = new();
private static SemaphoreSlim semaphore = new(1);
public static async Task Main() {
var outerLoopSource = Enumerable.Range(0,10);
await Parallel.ForEachAsync(outerLoopSource, OuterLoopFunction);
foreach(var (key, list) in resource)
Console.WriteLine(key+": "+string.Join(',', list));
}
public static async ValueTask OuterLoopFunction(int i, CancellationToken cancel) {
// some time consuming calculation ...
var key = i%3;
const int listSize = 10;
IReadOnlyList<int> list;
await semaphore.WaitAsync();
try {
if(!resource.TryGetValue(key, out list)) {
var newList = new int[listSize];
list = newList;
resource.Add(key, list);
await Parallel.ForEachAsync(Enumerable.Range(0,listSize), InnerLoopFunction);
ValueTask InnerLoopFunction(int j, CancellationToken cancel) {
// some time consuming calculation ...
newList[j] = 42+i;
return ValueTask.CompletedTask;
}
}
} finally {
semaphore.Release();
}
// do something with list
}
}ForEachAsync可以在这样的嵌套循环中使用吗?并行操作的数量是否仍然受到System.Environment.ProcessorCount的限制?
更新
在评论中,人们建议使用任务并行库中的数据流组件。如果我从头开始编写代码,这可能是更好的方法。然而,在我的例子中,有相当多的遗留代码在进行计算,在我看来,为了应用这个概念,我需要对它进行重大的重构,因为我必须在与外部循环相同的级别上提升当前的内部循环。因此,我想知道是否使用烦恼SemaphoreSlim来限制并行执行的数量,就像描述的那样,这里避免了并行运行到许多任务/线程,而没有太多的性能损失。
发布于 2022-03-17 13:45:10
不,ParallelOptions.MaxDegreeOfParallelism只影响配置的Parallel.ForEachAsync循环的并行度。它不是影响可能嵌套在外部并行循环中的所有其他并行循环的环境属性。例如,如果使用MaxDegreeOfParallelism = 5配置外部并行循环,用MaxDegreeOfParallelism = 3配置内部并行循环,则可以在任何给定时刻并发调用内并行循环的委托15次(5 * 3)。
这假设内部并行循环是不受限制的。在您的示例中,您已经使用SemaphoreSlim(1)将内部并行循环封装在受保护区域中。因此,在任何给定时刻,只能有一个内部并行循环处于活动状态。内环的委托的最大并发调用数是Environment.ProcessorCount ( Parallel.ForEachAsync API的默认MaxDegreeOfParallelism )。
https://stackoverflow.com/questions/71513038
复制相似问题