我有一个FileSystemWatcher,它在文件夹中查找文件。一旦创建了文件,就会触发创建的事件。我将每个创建的文件名添加到一个队列中。
void Main(){
FileSystemWatcher fsw = new FileSystemWatcher();
fsw.Path = System.Configuration.ConfigurationManager.AppSettings["PathToDataFolder"];
//fsw.Filter = "*.docx";
fsw.EnableRaisingEvents = true;
fsw.IncludeSubdirectories = true;
fsw.Created += new FileSystemEventHandler(fsw_Created);
}
private void fsw_Created(object sender, FileSystemEventArgs e)
{
queue.Enqueue(e.FullPath);
}超时,文件将增加,队列将变得很大。所以队列是动态的。我想并行处理每个文件。但是我不想一次处理很多文件,因为这是相当耗费资源的。一旦一个文件被处理,我想把它排掉,然后选择另一个要处理的文件。
我如何在C#中实现这一点?
发布于 2018-09-10 15:18:20
您可以使用具有可配置并行度的ActionBlock。默认情况下,ActionBlock只使用一个任务来处理传入的消息。您可以让它使用多任务并行处理文件。FSW的事件应该将路径直接张贴到块:
ActionBlock<string> _block;
void Main()
{
...
var options= new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = 4
};
_block=new ActionBlock<string>(path=>MyPathProcessingFunction(path), options);
//Configure the FSW as before
}
private void fsw_Created(object sender, FileSystemEventArgs e)
{
_block.Post(e.FullPath);
}发布于 2018-09-10 15:03:48
您可以通过使用生产者/消费者模式来实现这一点。在.Net中,BlockingCollection类提供对此模式的支持。每当事件处理程序被触发时,它都会向队列添加路径,并创建一个处理队列的新任务。因此,对于每个被监视的文件,都会创建一个新任务。如果需要,可以更改任务创建策略,还可以管理TaskScheduler如何调度任务创建策略。
public class Watcher
{
public Watcher()
{
_queue = new BlockingCollection<string>();
}
private BlockingCollection<string> _queue;
public void Start()
{
FileSystemWatcher fsw = new FileSystemWatcher();
fsw.Path = @"F:\a";
fsw.EnableRaisingEvents = true;
fsw.IncludeSubdirectories = true;
fsw.Created += Fsw_Created;
}
private void Fsw_Created(object sender, FileSystemEventArgs e)
{
_queue.Add(e.FullPath);
Task.Factory.StartNew(() =>
{
var path = _queue.Take();
// process the queue here
});
}
}发布于 2018-09-10 15:06:42
您正在寻找生产者/消费者模式,它在C#中可以通过BlockingCollection实现。
private static async Task Perform() {
// Be careful with this parameter: what do you expect the system
// to do if the pipeline contains pipelineMaxLength items?
int pipelineMaxLength = 100;
int consumersCount = 10;
using (BlockingCollection<string> pipeline =
new BlockingCollection<string>(pipelineMaxLength)) {
// Producer(s)
using (FileSystemWatcher fsw = new FileSystemWatcher()) {
...
fsw.Created += (s, e) => {
// whenever new file has been created, add it to the pipeline
if (!pipeline.IsAddingCompleted)
pipeline.Add(e.FullPath);
// Whenever you have no files to add and you want quit processing call
// pipeline.CompleteAdding();
};
// Consumers (consumersCount of them are working in parallel)
var consumers = Enumerable
.Range(0, consumersCount) //
.Select(index => Task.Run(() => {
// each consumer extracts file from the pipeline and processes it
foreach (var file in pipeline.GetConsumingEnumerable()) {
//TODO: process the file here
} }))
.ToArray();
// (a)wait until all the consumers finish their work
await Task
.WhenAll(consumers);
}
}
}https://stackoverflow.com/questions/52260232
复制相似问题