首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >为每个并行循环或任何替代并行循环并行?

为每个并行循环或任何替代并行循环并行?
EN

Stack Overflow用户
提问于 2019-09-26 14:04:50
回答 2查看 2.4K关注 0票数 1

我有这个密码

代码语言:javascript
复制
Lines.ToList().ForEach(y =>
{
    globalQueue.AddRange(GetTasks(y.LineCode).ToList());
});

因此,对于我的行列表中的每一行,我都会得到添加到全局生产队列中的任务。我可以有8条线。每个get任务请求GetTasks(y.LineCode)花费1分钟。我想使用并行,以确保我请求我的8个调用在一起,而不是一个接一个。

我该怎么办?

使用另一个ForEach循环还是使用另一个扩展方法?有ForEachAsync吗?使GetTasks请求本身异步?

EN

回答 2

Stack Overflow用户

发布于 2019-09-26 15:34:04

并行性不是并发性。并发性不是异步的。并行运行多个慢速查询并不能使它们运行得更快,恰恰相反。这些都是不同的问题,需要非常不同的解决办法。没有具体的问题,人们只能给出一般性的建议。

并行-处理一个800 K的项数组

并行性意味着使用多个核并行处理大量数据。为此,您需要对数据进行分区,并将每个分区提供给"worker“进行处理。您需要最小化工作人员之间的通信和同步的需要,以获得最佳性能,否则您的工作人员将花费CPU时间什么都不做。这意味着没有全局队列更新。

如果您有很多行,或者行处理是CPU绑定的,您可以使用PLINQ来处理它:

代码语言:javascript
复制
var query = from y in lines.AsParallel()
            from t in GetTasks(y.LineCode)
            select t;

var theResults=query.ToList();

就这样。无需通过锁定或使用并发集合来同步对队列的访问。这将使用所有可用的核心。您可以添加WithDegreeOfParallelism()以减少用于避免冻结的岩心数量。

并发-调用2000服务器

另一方面,并发意味着同时做几件不同的事情。不涉及分区。

例如,如果我必须查询8或2000服务器来监视数据(真实情况),我就不会使用Parallel或PLINQ。首先,Parallel和PLINQ使用所有可用的内核。在这种情况下,尽管他们什么也不做,他们只会等待回应。并行类也不能处理异步方法,因为没有意义--它们不应该等待响应。

一个非常快速和肮脏的解决方案是启动多个任务并等待它们返回,例如:

代码语言:javascript
复制
var tasks=lines.Select(y=>Task.Run(()=>GetTasks(y.LineCode));
//Array of individual results
var resultsArray=await Task.WhenAll(tasks);

//flatten the results
var resultList=resultsArray.SelectMany(r=>r).ToList();

这将立即启动所有请求。网络安全不喜欢2000并发请求,因为它看起来像是黑客攻击,并造成了一些网络洪流。

与数据流并发

我们可以使用TPL数据流库和例如ActionBlock或TransformBlock来以受控的并行度发出请求:

代码语言:javascript
复制
var options=new ExecutionDataflowBlockOptions { 
                    MaxDegreeOfParallelism = 4 ,
                    BoundedCapacity=10,
            };
var spamBlock=new TransformManyBlock<Line,Result>(
                               y=>GetTasks(y.LineCode),
                               options);
var outputBlock=new BufferBlock<Result>();
spamBlock.LinkTo(outputBlock);

foreach(var line in lines)
{
    await spamBlock.SendAsync(line);
}
spamBlock.Complete();
//Wait for all 4 workers to finish
await spamBlock.Completion;

一旦spamBlock完成,就可以在outputBlock中找到结果。通过设置一个BoundedCapacity,我确保如果spamBlock的输入队列中有太多未处理的消息,那么that循环将等待。

ActionBlock也可以处理异步方法。假设GetTasksAsync返回我们可以使用的Task<Result[]>

代码语言:javascript
复制
var spamBlock=new TransformManyBlock<Line,Result>(
                               y=>GetTasksAsync(y.LineCode),
                               options);
票数 3
EN

Stack Overflow用户

发布于 2019-09-26 14:10:41

您可以使用平行Foreach

代码语言:javascript
复制
        Parallel.ForEach(Lines, (line) =>
        {
             globalQueue.AddRange(GetTasks(line.LineCode).ToList());
        });

Parallel.ForEach循环的工作方式类似于Parallel.For循环。循环根据系统环境划分源集合并调度多个线程上的工作。系统上的处理器越多,并行方法的运行速度就越快。

票数 2
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/58118593

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档