首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >并行处理项

并行处理项
EN

Stack Overflow用户
提问于 2018-09-10 14:39:17
回答 3查看 131关注 0票数 0

我有一个FileSystemWatcher,它在文件夹中查找文件。一旦创建了文件,就会触发创建的事件。我将每个创建的文件名添加到一个队列中。

代码语言:javascript
复制
void Main(){
            FileSystemWatcher fsw = new FileSystemWatcher();
            fsw.Path = System.Configuration.ConfigurationManager.AppSettings["PathToDataFolder"];
            //fsw.Filter = "*.docx";
            fsw.EnableRaisingEvents = true;
            fsw.IncludeSubdirectories = true;
            fsw.Created += new FileSystemEventHandler(fsw_Created);
}
private void fsw_Created(object sender, FileSystemEventArgs e)
{
            queue.Enqueue(e.FullPath);    
}

超时,文件将增加,队列将变得很大。所以队列是动态的。我想并行处理每个文件。但是我不想一次处理很多文件,因为这是相当耗费资源的。一旦一个文件被处理,我想把它排掉,然后选择另一个要处理的文件。

我如何在C#中实现这一点?

EN

回答 3

Stack Overflow用户

回答已采纳

发布于 2018-09-10 15:18:20

您可以使用具有可配置并行度的ActionBlock。默认情况下,ActionBlock只使用一个任务来处理传入的消息。您可以让它使用多任务并行处理文件。FSW的事件应该将路径直接张贴到块:

代码语言:javascript
复制
ActionBlock<string> _block;

void Main()
{

    ...
    var options= new ExecutionDataflowBlockOptions
     {
        MaxDegreeOfParallelism = 4
     };

    _block=new ActionBlock<string>(path=>MyPathProcessingFunction(path), options);

   //Configure the FSW as before
}

private void fsw_Created(object sender, FileSystemEventArgs e)
{
    _block.Post(e.FullPath);
}
票数 3
EN

Stack Overflow用户

发布于 2018-09-10 15:03:48

您可以通过使用生产者/消费者模式来实现这一点。在.Net中,BlockingCollection类提供对此模式的支持。每当事件处理程序被触发时,它都会向队列添加路径,并创建一个处理队列的新任务。因此,对于每个被监视的文件,都会创建一个新任务。如果需要,可以更改任务创建策略,还可以管理TaskScheduler如何调度任务创建策略。

代码语言:javascript
复制
public class Watcher
{
    public Watcher()
    {
        _queue = new BlockingCollection<string>();
    }

    private BlockingCollection<string> _queue;

    public void Start()
    {
        FileSystemWatcher fsw = new FileSystemWatcher();
        fsw.Path = @"F:\a";
        fsw.EnableRaisingEvents = true;
        fsw.IncludeSubdirectories = true;
        fsw.Created += Fsw_Created;
    }

    private void Fsw_Created(object sender, FileSystemEventArgs e)
    {
        _queue.Add(e.FullPath);

        Task.Factory.StartNew(() =>
        {
            var path = _queue.Take();
            // process the queue here
        });
    }
}
票数 1
EN

Stack Overflow用户

发布于 2018-09-10 15:06:42

您正在寻找生产者/消费者模式,它在C#中可以通过BlockingCollection实现。

代码语言:javascript
复制
 private static async Task Perform() {
   // Be careful with this parameter: what do you expect the system
   // to do if the pipeline contains pipelineMaxLength items?
   int pipelineMaxLength = 100;
   int consumersCount = 10;

   using (BlockingCollection<string> pipeline = 
     new BlockingCollection<string>(pipelineMaxLength)) {

     // Producer(s)
     using (FileSystemWatcher fsw = new FileSystemWatcher()) {
       ...
       fsw.Created += (s, e) => {
         // whenever new file has been created, add it to the pipeline 
         if (!pipeline.IsAddingCompleted)
           pipeline.Add(e.FullPath);

         // Whenever you have no files to add and you want quit processing call
         // pipeline.CompleteAdding();
       };

       // Consumers (consumersCount of them are working in parallel)
       var consumers = Enumerable
        .Range(0, consumersCount) // 
        .Select(index => Task.Run(() => {
           // each consumer extracts file from the pipeline and processes it
           foreach (var file in pipeline.GetConsumingEnumerable()) {
             //TODO: process the file here 
           } }))
        .ToArray();

       // (a)wait until all the consumers finish their work
       await Task
         .WhenAll(consumers);
     }
   }
 }
票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/52260232

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档