我想并行处理多个视频剪辑的应用程序一帧一帧。每个剪辑的每个帧的顺序是重要的(显然)。我决定使用,因为我认为这是数据流(电影帧就是数据)的一个很好的例子。
因此,我有一个从数据库加载帧的过程(比方说,在一批500帧中,所有这些都已打包)。
Example sequence:
|mid:1 fr:1|mid:1 fr:2|mid:2 fr:1|mid:3 fr:1|mid:1 fr:3|mid:2 fr:2|mid:2 fr:3|mid:1 fr:4|并将其发布到BufferBlock。对于这个BufferBlock,我已经将ActionBlocks与过滤器连接起来,使每个MovieID有一个ActionBlock,这样我就可以得到某种类型的数据分区。每个ActionBlock都是顺序的,但理想情况下,多个电影的多个ActionBlocks可以并行运行。
我确实有上面描述的网络工作,它确实并行运行,但根据我的计算,只有8到10个ActionBlocks同时执行。我对每个ActionBlock的运行时间进行了计时,其运行时间约为100至200‘s。我可以采取哪些步骤至少加倍并发?
我确实尝试过将操作委托转换为异步方法,并使数据库访问在ActionBlock操作委托中是异步的,但没有帮助。
编辑:I实现了额外的数据分区:带有奇数I的电影的帧在ServerA上处理,甚至电影的帧在ServerB上处理。应用程序的两个实例都访问同一个数据库。如果我的问题是DB,那么我将不会看到在处理的总帧数上有任何改进(或者很少,低于20%)。但我确实看到它加倍了。因此,这使我得出结论,线程池不会产生更多线程来并行执行更多帧(这两台服务器都是四核服务器,分析器每个应用程序显示大约25-30个线程)。
发布于 2012-11-15 19:00:33
一些假设:
ActionBlock实例是通用的;它们都调用相同的处理方法,您只需根据每个电影id创建一个它们的列表(预先有一个电影id列表),如下所示:// The movie IDs
IEnumerable<int> movieIds = ...;
// The actions.
var actions = movieIds.Select(
i => new { Id = i, Action = new ActionBlock<Frame>(MethodToProcessFrame) });
// The buffer block.
BufferBlock<Frame> buffer = ...;
// Link everything up.
foreach (var action in actions)
{
// Not necessary in C# 5.0, but still, good practice.
// The copy of the action.
var actionCopy = action;
// Link.
bufferBlock.LinkTo(actionCopy.Action, f => f.MovieId == actionCopy.Id);
}如果是这样的话,您将创建太多没有工作的ActionBlock<T>实例;因为您的帧(可能是电影)出现了故障,所以不能保证所有的ActionBlock<T>实例都有工作要做。
此外,当您创建一个ActionBlock<T>实例时,它将使用1的MaxDegreeOfParallelism创建,这意味着它是线程安全的,因为只有一个线程可以同时访问该块。
此外,TPL DataFlow库最终依赖于班级,默认情况下,班级在线程池上调度。线程池将在这里做一些事情:
ActionBlock<T>实例是饱和的,这是您应该关注的度量。处理电影的方法看起来也是通用的,从哪个电影中传入什么样的帧也不重要(如果这很重要,那么您就需要用它来更新您的问题,因为它会改变很多事情)。这也意味着它是线程安全的。
另外,如果可以假设一个帧的处理不依赖于任何以前的帧的处理(或者,它看起来像电影的帧按顺序排列),那么您可以使用单个ActionBlock<T>,但是可以调整MaxDegreeOfParallelism值,如下所示:
// The buffer block.
BufferBlock<Frame> buffer = ...;
// Have *one* ActionBlock<T>
var action = new ActionBlock<Frame>(MethodToProcessFrame,
// This is where you tweak the concurrency:
new ExecutionDataflowBlockOptions {
MaxDegreeOfParallelism = 4,
}
);
// Link. No filter needed.
bufferBlock.LinkTo(action);现在,您的ActionBlock<T>将始终饱和。当然,任何负责任的任务调度程序(默认情况下是线程池)仍将限制最大并发量,但它将尽可能多地同时执行。
为此,如果您的操作是真正的线程安全,您可以将MaxDegreeOfParallelism设置为DataflowBlockOptions.Unbounded,如下所示:
// Have *one* ActionBlock<T>
var action = new ActionBlock<Frame>(MethodToProcessFrame,
// This is where you tweak the concurrency:
new ExecutionDataflowBlockOptions {
// We're thread-safe, let the scheduler determine
// how nuts we can go.
MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded,
}
);当然,所有这些都假定其他一切都是最佳的(I/O、读/写等)。
发布于 2012-11-14 17:01:58
这很可能是并行化的最佳程度。线程池非常擅长于确定具有活动状态的实际线程的最佳数量。我猜想,您的硬件可以支持许多并行进程,实际上是并行工作的。如果添加的更多,实际上不会增加吞吐量,那么您只会花费更多的时间在线程之间进行上下文切换,从而减少实际处理线程的时间。
如果您注意到,在较长的一段时间内,您的CPU负载、内存总线、网络连接、磁盘访问等都低于容量,那么您可能会遇到问题,您可能想检查一下什么是瓶颈。有可能是某些资源处于它的容量范围内,而TPL已经认识到这一点,并确保它不会过度饱和该资源。
发布于 2012-11-14 18:52:18
我怀疑你是被组织起来的。问题是在哪里?在读或写上。你写的数据比读的多吗。CPU可能低于50%,因为它不能写得更快。
我并不是说ActionBlock是错的,但我会考虑BlockingCollection的生产者消费者。优化数据的读写方式。
这是不同的,但我有一个应用程序,我读块文字。解析文本,然后将单词写回SQL。我在单个线程上读取,然后并行解析,然后在单个线程上写入。我只在一个线程上写,这样就不会破坏索引。如果您是IO绑定的,您需要找出什么是最慢的IO,然后优化这个过程。
告诉我更多关于那个IO的事。
在您提到的问题中也提到了从数据库中读取。
我会给BlockingCollections一次机会。
BlockingCollection类
并且每个都有大小限制,这样你就不会浪费内存了。
让它变得足够大,这样它(几乎)就不会变成空的。
最慢步骤后的阻塞集合将变为空。如果您可以并行处理,那么就这样做。
我所发现的是,表中的并行插入并不是更快。
让一个过程锁定和保持它,并保持软管打开。
仔细看看你是如何插入的。
一排一排是慢的。
我使用TVP,一次插入10,000,但很多人喜欢Drapper或BulkInsert。
如果删除索引和触发器并插入,按聚集索引排序将是最快的。拿一张桌子拿着它。我得到10毫秒范围内的插入。
现在更新是最慢的。你看-你每次只做一排吗?
看一看用视频剪辑拍摄的画面和动作。
除非它是一个丑陋的更新,它不应该花费更长的时间在插入。
https://stackoverflow.com/questions/13383450
复制相似问题