我有两个使用SemaphoreSlim的循环和一个字符串“内容”数组。
前环:
var allTasks = new List<Task>();
var throttle = new SemaphoreSlim(10,10);
foreach (string s in Contents)
{
await throttle.WaitAsync();
allTasks.Add(
Task.Run(async () =>
{
try
{
rootResponse.Add(await POSTAsync(s, siteurl, src, target));
}
finally
{
throttle.Release();
}
}));
}
await Task.WhenAll(allTasks);A循环:
var allTasks = new List<Task>();
var throttle = new SemaphoreSlim(10,10);
for(int s=0;s<Contents.Count;s++)
{
await throttle.WaitAsync();
allTasks.Add(
Task.Run(async () =>
{
try
{
rootResponse[s] = await POSTAsync(Contents[s], siteurl, src, target);
}
finally
{
throttle.Release();
}
}));
}
await Task.WhenAll(allTasks);第一个foreach循环运行良好,但是for循环Task.WhenAll(allTasks)返回一个OutOfRangeException,我希望Contents[]索引和列表索引匹配。
我能修一下for循环吗?还是有更好的方法?
发布于 2018-10-21 04:56:13
这将解决您当前的问题。
for (int s = 0; s < Contents.Count; s++)
{
var content = Contents[s];
allTasks.Add(
Task.Run(async () =>
{
await throttle.WaitAsync();
try
{
rootResponse[s] = await POSTAsync(content, siteurl, src, target);
}
finally
{
throttle.Release();
}
}));
}
await Task.WhenAll(allTasks);然而,这是一段相当混乱和令人讨厌的代码。这个看起来更整洁一些。
public static async Task DoStuffAsync(Content[] contents, string siteurl, string src, string target)
{
var throttle = new SemaphoreSlim(10, 10);
// local method
async Task<(Content, SomeResponse)> PostAsyncWrapper(Content content)
{
await throttle.WaitAsync();
try
{
// return a content and result pair
return (content, await PostAsync(content, siteurl, src, target));
}
finally
{
throttle.Release();
}
}
var results = await Task.WhenAll(contents.Select(PostAsyncWrapper));
// do stuff with your results pairs here
}还有很多其他方法可以这样做:PLinq、Parallel.For、Parallel.ForEach,或者像上面那样整理循环中的捕获。
但是,由于您有一个IO绑定的工作负载,并且您有运行它的async方法。最合适的解决方案是async await模式,它既不适合Parallel.For,也不适合Parallel.ForEach。
另一种方法是TPL DataFlow库,它可以在努基特包中找到。
码
public static async Task DoStuffAsync(Content[] contents, string siteurl, string src, string target)
{
async Task<(Content, SomeResponse)> PostAsyncWrapper(Content content)
{
return (content, await PostAsync(content, siteurl, src, target));
}
var bufferblock = new BufferBlock<(Content, SomeResponse)>();
var actionBlock = new TransformBlock<Content, (Content, SomeResponse)>(
content => PostAsyncWrapper(content),
new ExecutionDataflowBlockOptions
{
EnsureOrdered = false,
MaxDegreeOfParallelism = 100,
SingleProducerConstrained = true
});
actionBlock.LinkTo(bufferblock);
foreach (var content in contents)
actionBlock.Post(content);
actionBlock.Complete();
await actionBlock.Completion;
if (bufferblock.TryReceiveAll(out var result))
{
// do stuff with your results pairs here
}
}基本上,这会创建一个BufferBlock和TransformBlock,您将工作负载泵入TransformBlock,它在其选项中具有一定程度的并行性,并将它们推入BufferBlock,等待完成并得到结果。
为什么是Dataflow?因为它处理的是async await,所以它有MaxDegreeOfParallelism,它专为IO绑定或与CPU绑定的工作负载设计,而且使用起来非常简单。此外,由于大多数数据通常以多种方式(在管道中)处理,因此您可以使用它来按顺序、并行或以您选择的任何方式对数据流进行管道和操作。
不管怎样祝你好运
https://stackoverflow.com/questions/52911847
复制相似问题