首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >如何在c#中创建一个“失忆”异步FIFO队列?

如何在c#中创建一个“失忆”异步FIFO队列?
EN

Stack Overflow用户
提问于 2020-12-30 13:33:15
回答 1查看 301关注 0票数 0

我正在尝试异步处理文档。其想法是用户将文档发送到服务,这需要时间,并将在稍后查看结果(每个文档大约20-90秒)。

理想情况下,我只想填充一些可以观察到的集合,系统将尽可能快地将其清空。当存在一个项时,处理它并在另一个对象中生成预期的输出,而当没有项时,只需什么都不做。当用户检查输出集合时,他将找到已经处理的项。

理想情况下,所有项目从一开始都是可见的,并且有一个状态(已完成、正在进行或处于队列中),但是一旦我知道如何做第一个,我应该能够处理状态。

我不知道该使用哪个对象,现在我正在查看BlockingCollection,但我不认为它适合这项工作,因为我无法在它从另一端空出来的时候填充它。

代码语言:javascript
复制
       private BlockingCollection<IDocument> _jobs = new BlockingCollection<IDocument>();
       public ObservableCollection<IExtractedDocument> ExtractedDocuments { get; }

       public QueueService()
       {
           ExtractedDocuments = new ObservableCollection<IExtractedDocument>();
       }
       
       public async Task Add(string filePath, List<Extra> extras)
       {
           if (_jobs.IsAddingCompleted || _jobs.IsCompleted)
               _jobs = new BlockingCollection<IDocument>();
     
           var doc = new Document(filePath, extras);
           _jobs.Add(doc);
           _jobs.CompleteAdding();
           
           await ProcessQueue();
       }

       private async Task ProcessQueue()
       {
           foreach (var document in _jobs.GetConsumingEnumerable(CancellationToken.None))
           {
               var resultDocument = await service.ProcessDocument(document);
               ExtractedDocuments.Add(resultDocument );
               Debug.WriteLine("Job completed");
           }
       }

我现在就是这样处理的。如果删除CompleteAdding调用,它将挂起第二次尝试。如果我有那句话,那我就不能只是排满队,我得先把它空出来,这就违背了我的目的。

有什么办法让我得到我想要达到的目标吗?一个我将填充的集合,系统将异步地、自主地处理?

总之,我需要:

  • :我可以填充的集合,它将被逐步和异步地处理。可以在处理某些文档时添加文档、系列或文档。
  • 是在进程完成后将填充的输出集合,用户界面线程和应用程序在运行
  • 时仍然可以响应,我不需要并行拥有多个进程,或者一次只需要一个文档。最容易安装和维护的就可以了(小规模的应用程序)。我假设一次一个更简单。
EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2020-12-30 15:08:27

这里的一个常见模式是有一个回调方法,该方法在文档状态更改时执行。随着后台任务的运行,它将尽可能快地咀嚼抛出的文档。调用Dispose关闭处理器。

如果您需要在gui线程上处理回调,那么您需要对主线程的回调进行同步处理。如果您正在使用Windows窗体,则可以使用该方法来执行此操作。

这个示例程序实现了所有必要的类和接口,您可以根据需要微调和调整内容。

代码语言:javascript
复制
using System;
using System.Collections.Concurrent;
using System.Threading;
using System.Threading.Tasks;

namespace ConsoleApp2
{
    class Program
    {
        private static Task Callback(IExtractedDocument doc, DocumentProcessor.DocState docState)
        {
            Console.WriteLine("Processing doc {0}, state: {1}", doc, docState);
            return Task.CompletedTask;
        }

        public static void Main()
        {
            using DocumentProcessor docProcessor = new DocumentProcessor(Callback);
            Console.WriteLine("Processor started, press any key to end processing");
            for (int i = 0; i < 100; i++)
            {
                if (Console.KeyAvailable)
                {
                    break;
                }
                else if (i == 5)
                {
                    // make an error
                    docProcessor.Add(null);
                }
                else
                {
                    docProcessor.Add(new Document { Text = "Test text " + Guid.NewGuid().ToString() });
                }
                Thread.Sleep(500);
            }
            Console.WriteLine("Doc processor shut down, press ENTER to quit");
            Console.ReadLine();
        }

        public interface IDocument
        {
            public string Text { get; }
        }

        public class Document : IDocument
        {
            public string Text { get; set; }
        }

        public interface IExtractedDocument : IDocument
        {
            public IDocument OriginalDocument { get; }
            public Exception Error { get; }
        }

        public class ExtractedDocument : IExtractedDocument
        {
            public override string ToString()
            {
                return $"Orig text: {OriginalDocument?.Text}, Extracted Text: {Text}, Error: {Error}";
            }

            public IDocument OriginalDocument { get; set; }

            public string Text { get; set; }

            public Exception Error { get; set; }
        }

        public class DocumentProcessor : IDisposable
        {
            public enum DocState { Processing, Completed, Error }

            private readonly BlockingCollection<IDocument> queue = new BlockingCollection<IDocument>();
            private readonly Func<IExtractedDocument, DocState, Task> callback;
            private CancellationTokenSource cancelToken = new CancellationTokenSource();

            public DocumentProcessor(Func<IExtractedDocument, DocState, Task> callback)
            {
                this.callback = callback;
                Task.Run(() => StartQueueProcessor()).GetAwaiter();
            }

            public void Dispose()
            {
                if (!cancelToken.IsCancellationRequested)
                {
                    cancelToken.Cancel();
                }
            }

            public void Add(IDocument doc)
            {
                if (cancelToken.IsCancellationRequested)
                {
                    throw new InvalidOperationException("Processor is disposed");
                }
                queue.Add(doc);
            }

            private void ProcessDocument(IDocument doc)
            {
                try
                {
                    // do processing
                    DoCallback(new ExtractedDocument { OriginalDocument = doc }, DocState.Processing);
                    if (doc is null)
                    {
                        throw new ArgumentNullException("Document to process was null");
                    }
                    IExtractedDocument successExtractedDocument = DoSomeDocumentProcessing(doc);
                    DoCallback(successExtractedDocument, DocState.Completed);
                }
                catch (Exception ex)
                {
                    DoCallback(new ExtractedDocument { OriginalDocument = doc, Error = ex }, DocState.Error);
                }
            }

            private IExtractedDocument DoSomeDocumentProcessing(IDocument originalDocument)
            {
                return new ExtractedDocument { OriginalDocument = originalDocument, Text = "Extracted: " + originalDocument.Text };
            }

            private void DoCallback(IExtractedDocument result, DocState docState)
            {
                if (callback != null)
                {
                    // send callbacks in background
                    callback(result, docState).GetAwaiter();
                }
            }

            private void StartQueueProcessor()
            {
                try
                {
                    while (!cancelToken.Token.IsCancellationRequested)
                    {
                        if (queue.TryTake(out IDocument doc, 1000, cancelToken.Token))
                        {
                            // can chance to Task.Run(() => ProcessDocument(doc)).GetAwaiter() for parallel execution
                            ProcessDocument(doc);
                        }
                    }
                }
                catch (OperationCanceledException)
                {
                    // ignore, don't need to throw or worry about this
                }
                while (queue.TryTake(out IDocument doc))
                {
                    DoCallback(new ExtractedDocument { Error = new ObjectDisposedException("Processor was disposed") }, DocState.Error);
                }
            }
        }
    }
}
票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/65507807

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档