我想知道是否可以优化以下代码以更快地执行。我目前似乎在一个相当简单的数据流结构上,每秒最多可以发送大约140万条简单消息。我知道这个示例流程同步地传递/转换消息,但是,我目前测试TPL Dataflow作为我自己的基于任务和并发收集的自定义解决方案的可能替代方案。我知道“并发”这一术语已经表明我是并行运行的,但为了目前的测试目的,我在自己的解决方案上同步推送消息,我达到了大约每秒510万条消息。我在这里遗漏了什么,我读到TPL Dataflow是作为一个高吞吐量、低延迟的解决方案被推送的,但到目前为止,我必须忽略性能调整。谁能告诉我正确的方向?
class TPLDataFlowExperiments
{
public TPLDataFlowExperiments()
{
var buf1 = new BufferBlock<int>();
var transform = new TransformBlock<int, string>(t =>
{
return "";
});
var action = new ActionBlock<string>(s =>
{
//Thread.Sleep(100);
//Console.WriteLine(s);
});
buf1.LinkTo(transform);
transform.LinkTo(action);
//Propagate all Completions down the flow
buf1.Completion.ContinueWith(t =>
{
transform.Complete();
transform.Completion.ContinueWith(u =>
{
action.Complete();
});
});
Stopwatch watch = new Stopwatch();
watch.Start();
int cap = 10000000;
for (int i = 0; i < cap; i++)
{
buf1.Post(i);
}
//Mark Buffer as Complete
buf1.Complete();
action.Completion.ContinueWith(t =>
{
watch.Stop();
Console.WriteLine("All Blocks finished processing");
Console.WriteLine("Units processed per second: " + cap / watch.ElapsedMilliseconds * 1000);
});
Console.ReadLine();
}
}发布于 2012-06-22 21:07:52
我认为这主要归结为一件事:你的测试几乎没有意义。所有这些块都应该做一些事情,并使用多核和异步操作来做这件事。
此外,在您的测试中,很可能在同步上花费了大量时间。使用更实际的代码,代码将需要一些时间来执行,因此将有较少的争用,因此实际开销将小于您测量的开销。
但实际上,为了回答你的问题,是的,你忽略了一些性能调整。具体地说,SingleProducerConstrained,这意味着可以使用锁定较少的数据结构。如果我在两个模块上都使用这个( BufferBlock在这里完全没有用,你可以安全地删除它),在我的计算机上,速度从每秒300-400万个项目增加到超过500万个项目。
发布于 2012-06-22 22:21:52
为了补充svick的答案,测试仅对单个操作块使用单个处理线程。这样,它只测试使用这些块的开销。
DataFlow的工作方式类似于F#代理、Scala参与者和MPI实现。每个动作块一次执行一个任务,侦听输入并产生输出。通过将算法分解成可以在多个内核上独立执行的步骤来提供加速,只相互传递消息。
虽然您可以增加并发任务的数量,但最重要的问题是设计一个独立于其他步骤执行最大数量的步骤的流。
发布于 2013-12-04 05:17:26
您还可以增加数据流块的并行度。这可能会提供额外的加速,并且还可以帮助在线性任务之间实现负载平衡,如果您发现其中一个块是其余任务的瓶颈。
https://stackoverflow.com/questions/11155085
复制相似问题