我有一个大文件夹结构,我试图从一个共享驱动器下载。共享驱动器是缓慢的,但它也有几个镜像。为了加速这一过程,我正在尝试制作一个小型下载程序,它可以管理与所有慢镜像的并行连接。各个文件将从不同的镜像中下载。我也希望能够限制一次连接到每个镜像的线程数量。(这个问题已经存在了吗?那我就不用写任何代码了!不过,我看上去的确如此。
这似乎是一个Dataflow用例,虽然我对Dataflow非常陌生,所以我并不乐观。我一开始是这样的:
var buffer = new BufferBlock<string>();
var blockOptions = new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = threadsPerPath
};
IEnumerable<ActionBlock<string>> blocks = mirrors.Select(basePath =>
{
return new ActionBlock<string>(
file => {
string destinationFile = Path.Combine(destination, file);
Directory.CreateDirectory(Path.GetDirectoryName(destinationFile));
File.Copy(Path.Combine(basePath, file), destinationFile);
},
blockOptions);
});
foreach (ActionBlock<string> block in blocks)
{
buffer.LinkTo(block);
}
await Task.Run(() =>
{
string top = mirrors[0];
int baseLength = top.Length;
IEnumerable<string> allFiles = Directory.EnumerateFiles(top, "*", SearchOption.AllDirectories);
foreach (string path in allFiles)
{
buffer.Post(path[baseLength..]);
}
buffer.Complete();
});(我打算和threadsPerPath一起玩。不确定是否会看到并行访问同一个镜像的好处。)运行时,这只使用第一个镜像-据我所能告诉ActionBlocks的其他镜像永远得不到数据。我想这是故意的,但我不知道怎么做。如何使多个ActionBlocks并行地处理同一个缓冲区,其中缓冲区中的每个项只处理其中一个ActionBlocks?
发布于 2022-09-20 16:43:58
要实现负载平衡,需要限制链接块的输入容量。如果要将请求扩展到所有块,则必须限制它们的BoundedCapacity,甚至限制为1或至少与MaxDegreeOfParallelism相同的数字
var blockOptions = new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = threadsPerPath,
BoundedCapacity=bound
};原始代码的问题是,所有ActionBlock的输入缓冲区都是无限的,因此所有消息(文件)都在第一个块中结束。
块的输出被发送到第一个可用的链接块。如果该块具有无限缓冲区,则所有消息都将发送到同一块。通过将所有块的输入缓冲区限制为1项,可以强制将每条消息(在本例中为文件)发送到不同的块。
消息发布后,您可以等待所有块完成以下操作:
await Task.WhenAll(blocks.Select(b=>b.Completion));https://stackoverflow.com/questions/73588039
复制相似问题