我需要从一个文件中读取数据,处理并将结果写入另一个文件。我使用后台工作人员来显示进程状态.I编写类似于在背景工作者的DoWork事件中使用的东西
private void ProcData(string fileToRead,string fileToWrite)
{
byte[] buffer = new byte[4 * 1024];
//fileToRead & fileToWrite have same size
FileInfo fileInfo = new FileInfo(fileToRead);
using (FileStream streamReader = new FileStream(fileToRead, FileMode.Open))
using (BinaryReader binaryReader = new BinaryReader(streamReader))
using (FileStream streamWriter = new FileStream(fileToWrite, FileMode.Open))
using (BinaryWriter binaryWriter = new BinaryWriter(streamWriter))
{
while (streamWriter.Position < fileInfo.Length)
{
if (streamWriter.Position + buffer.Length > fileInfo.Length)
{
buffer = new byte[fileInfo.Length - streamWriter.Position];
}
//read
buffer = binaryReader.ReadBytes(buffer.Length);
//proccess
Proc(buffer);
//write
binaryWriter.Write(buffer);
//report if procentage changed
//...
}//while
}//using
}但是它比从fileToRead读取和写入fileToWrite慢5倍,所以我考虑线程处理。我在网站上读到了一些问题,并在this question上尝试了这样的基础。
private void ProcData2(string fileToRead, string fileToWrite)
{
int threadNumber = 4; //for example
Task[] tasks = new Task[threadNumber];
long[] startByte = new long[threadNumber];
long[] length = new long[threadNumber];
//divide file to threadNumber(4) part
//and update startByte & length
var parentTask = Task.Run(() =>
{
for (int i = 0; i < threadNumber; i++)
{
tasks[i] = Task.Factory.StartNew(() =>
{
Proc2(fileToRead, fileToWrite, startByte[i], length[i]);
});
}
});
parentTask.Wait();
Task.WaitAll(tasks);
}
//
private void Proc2(string fileToRead,string fileToWrite,long fileStartByte,long partLength)
{
byte[] buffer = new byte[4 * 1024];
using (FileStream streamReader = new FileStream(fileToRead, FileMode.Open,FileAccess.Read,FileShare.Read))
using (BinaryReader binaryReader = new BinaryReader(streamReader))
using (FileStream streamWriter = new FileStream(fileToWrite, FileMode.Open,FileAccess.Write,FileShare.Write))
using (BinaryWriter binaryWriter = new BinaryWriter(streamWriter))
{
streamReader.Seek(fileStartByte, SeekOrigin.Begin);
streamWriter.Seek(fileStartByte, SeekOrigin.Begin);
while (streamWriter.Position < fileStartByte+partLength)
{
if (streamWriter.Position + buffer.Length > fileStartByte+partLength)
{
buffer = new byte[fileStartByte+partLength - streamWriter.Position];
}
//read
buffer = binaryReader.ReadBytes(buffer.Length);
//proccess
Proc(buffer);
//write
binaryWriter.Write(buffer);
//report if procentage changed
//...
}//while
}//using
}但我认为它有一些问题,每一次切换任务,它需要再次寻找。我考虑读取文件,对Proc()使用线程,然后编写结果,但这似乎是错误的。如何正确地执行此操作?(从文件中读取缓冲区,通过使用任务处理并在其他文件上写入)
//===================================================================
在皮特·柯克汉的基础上,我修改了我的方法。我不知道为什么,但这对我没有用。我增加了帮助他们的新方法。谢谢每一个人
private void ProcData3(string fileToRead, string fileToWrite)
{
int bufferSize = 4 * 1024;
int threadNumber = 4;//example
List<byte[]> bufferPool = new List<byte[]>();
Task[] tasks = new Task[threadNumber];
//fileToRead & fileToWrite have same size
FileInfo fileInfo = new FileInfo(fileToRead);
using (FileStream streamReader = new FileStream(fileToRead, FileMode.Open))
using (BinaryReader binaryReader = new BinaryReader(streamReader))
using (FileStream streamWriter = new FileStream(fileToWrite, FileMode.Open))
using (BinaryWriter binaryWriter = new BinaryWriter(streamWriter))
{
while (streamWriter.Position < fileInfo.Length)
{
//read
for (int g = 0; g < threadNumber; g++)
{
if (streamWriter.Position + bufferSize <= fileInfo.Length)
{
bufferPool.Add(binaryReader.ReadBytes(bufferSize));
}
else
{
bufferPool.Add(binaryReader.ReadBytes((int)(fileInfo.Length - streamWriter.Position)));
break;
}
}
//do
var parentTask = Task.Run(() =>
{
for (int th = 0; th < bufferPool.Count; th++)
{
int index = th;
//threads
tasks[index] = Task.Factory.StartNew(() =>
{
Proc(bufferPool[index]);
});
}//for th
});
//stop parent task(run childs)
parentTask.Wait();
//wait till all task be done
Task.WaitAll(tasks);
//write
for (int g = 0; g < bufferPool.Count; g++)
{
binaryWriter.Write(bufferPool[g]);
}
//report if procentage changed
//...
}//while
}//using
}发布于 2016-07-06 09:17:16
本质上,您希望将数据处理拆分为并行任务,但不希望将IO拆分。
如何发生这种情况取决于数据的大小。如果它足够小,足以容纳内存,那么您可以将其全部读入一个输入数组并创建一个输出数组,然后创建任务来处理一些输入数组并填充一些输出数组,然后将整个输出数组写入文件。
如果数据量太大,则需要限制每次读取和写入的数据量。因此,您有了您的主要流程,首先读取N个数据块并创建N个任务来处理它们。然后,您按照顺序等待任务完成,每次完成时,您都编写输出块,读取一个新的输入块,并创建另一个任务。对于N和块大小,需要进行一些实验才能得到好的值,这意味着任务的完成速度与IO的速度大致相同。
https://stackoverflow.com/questions/38218692
复制相似问题