我对C#并不完全陌生,但我对这种语言还不太熟悉,不知道如何做我需要做的事情。
我有一个文件,叫它File1.txt。has有10万行左右。我将复制File1.txt并将其称为File1_untested.txt。我还将为文件中的每一行创建一个空文件"Successes.txt“:
所以,我的问题是,我怎么能多线程这个?
到目前为止,我的方法是创建对象(LineChecker),让对象检查它的行,并将对象传递到ThreadPool中。我了解如何使用ThreadPools来执行CountdownEvent中的一些任务。然而,一次排10万项任务似乎是不合理的。我怎样才能逐渐给游泳池喂食呢?一次大概1000行之类的。
另外,我需要确保没有两个线程同时添加到Successes.txt或从File1_untested.txt中删除。我可以用锁()来处理这件事,对吗?我应该把什么传给锁()?我可以使用LineChecker的静态成员吗?
我只是想更广泛地了解这样的东西是如何设计的。
发布于 2015-09-26 20:52:09
由于测试需要相当长的时间,所以使用多个CPU核是有意义的。但是,这种利用只应用于相对昂贵的测试,而不应用于读取/更新文件。这是因为读取/更新文件相对便宜。
下面是一些您可以使用的示例代码:
假设您有一个相对昂贵的测试方法:
private bool Test(string line)
{
//This test is expensive
}下面是一个代码示例,可以利用多个CPU进行测试:
在这里,我们将集合中的项目数限制为10,这样从文件读取的线程将等待其他线程赶上,然后再从文件中读取更多行。
这个输入线程的读取速度将远远快于其他线程所能测试的速度,因此在最坏的情况下,我们将比测试线程多读10行。这确保了我们有良好的内存消耗。
CancellationTokenSource cancellation_token_source = new CancellationTokenSource();
CancellationToken cancellation_token = cancellation_token_source.Token;
BlockingCollection<string> blocking_collection = new BlockingCollection<string>(10);
using (StreamReader reader = new StreamReader(new FileStream(filename, FileMode.Open, FileAccess.Read)))
{
using (
StreamWriter writer =
new StreamWriter(new FileStream(success_filename, FileMode.OpenOrCreate, FileAccess.Write)))
{
var input_task = Task.Factory.StartNew(() =>
{
try
{
while (!reader.EndOfStream)
{
if (cancellation_token.IsCancellationRequested)
return;
blocking_collection.Add(reader.ReadLine());
}
}
finally //In all cases, even in the case of an exception, we need to make sure that we mark that we have done adding to the collection so that the Parallel.ForEach loop will exit. Note that Parallel.ForEach will not exit until we call CompleteAdding
{
blocking_collection.CompleteAdding();
}
});
try
{
Parallel.ForEach(blocking_collection.GetConsumingEnumerable(), (line) =>
{
bool test_reault = Test(line);
if (test_reault)
{
lock (writer)
{
writer.WriteLine(line);
}
}
});
}
catch
{
cancellation_token_source.Cancel(); //If Paralle.ForEach throws an exception, we inform the input thread to stop
throw;
}
input_task.Wait(); //This will make sure that exceptions thrown in the input thread will be propagated here
}
}发布于 2015-09-26 20:54:53
如果您的“测试”是快速的,那么多线程将不会给您带来任何优势,因为您的代码将100%绑定到磁盘上,并且假定您的所有文件都在同一个磁盘上:您无法通过多线程来提高单个磁盘的吞吐量。
但是,由于您的“测试”将等待来自test服务器的响应,这意味着测试将是缓慢的,因此通过多线程有很大的改进空间。基本上,您需要的线程数量取决于can服务器可以同时服务多少请求,而不会降低can服务器的性能。这个数字可能仍然很低,所以你可能最终什么也得不到,但至少你可以尝试。
如果您的文件不是很大,那么您可以一次全部读取,然后一次全部写入。如果每一行只有80个字符,那么这意味着您的文件只有8兆字节,这是微不足道的,所以您可以将所有行读入一个列表,处理列表,生成另一个列表,最后写出整个列表。
这将允许您创建一个结构,例如,MyLine,它包含每一行的索引和每一行的文本,这样您就可以在编写所有行之前对它们进行排序,这样您就不必担心服务器的无序响应。
然后,您需要做的是像@Paul建议的那样使用一个边界阻塞队列( BlockingCollection )。
BlockingCollection接受它的最大容量作为构造函数参数。这意味着,一旦达到了其最大容量,任何向其添加的进一步尝试都会被阻止(调用方坐在那里等待),直到某些项被删除。因此,如果您希望有多达10个同时挂起的请求,您可以按照以下方式构造它:
var sourceCollection = new BlockingCollection<MyLine>(10);您的主线程将使用sourceCollection对象填充MyLine,并且将有10个线程阻止从集合中读取MyLine。每个线程向服务器发送一个请求,等待响应,将结果保存到线程安全的resultCollection中,并尝试从sourceCollection获取下一个条目。
您可以使用C#的async特性,而不是使用多线程,但我对它们并不十分熟悉,因此我无法确切地建议您如何做到这一点。
最后,将resultCollection的内容复制到List中,对列表进行排序,并将其写入输出文件。(将副本复制到单独的List中可能是个好主意,因为排序线程安全的resultCollection可能比排序非线程安全的List要慢得多。我说大概吧。)
https://stackoverflow.com/questions/32801639
复制相似问题