首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >如何使用TaskCompletionSource

如何使用TaskCompletionSource
EN

Stack Overflow用户
提问于 2018-12-29 09:07:07
回答 1查看 289关注 0票数 0

我有一个多生产者和单一消费者的情况,我选择了一个共同的thread-safe资源,其中所有生产者Enqueue项目。但是,我不知道如何有效地使生产者await为新的项目时,Dequeue-ing从该资源。

POCO

代码语言:javascript
复制
struct Sample
    {
        public int Id { get; set; }
        public  double Value { get; set; }
    }

Producers

代码语言:javascript
复制
class ProducerGroup
    {
        StorageQueue sink;
        int producerGroupSize;

        public ProducerGroup(StorageQueue _sink,int producers)
        {
            this.sink = _sink;
            this.producerGroupSize = producers;
        }
        public void StartProducers()
        {
            Task[] producers = new Task[producerGroupSize];
            int currentProducer;
            for (int i = 0; i < producerGroupSize; i++)
            {
                currentProducer = i;
                producers[i] = Task.Run(async () =>
                  {
                      int cycle = 0;
                      while (true)
                      {
                          if (cycle > 5)
                          {
                              cycle = 0;
                          }
                          Sample localSample = new Sample { Id = currentProducer, Value = cycle++ };
                          await Task.Delay(1000);
                          this.sink.Enqueue(localSample);
                      }
                  });
            }

        }
    }

存储

代码语言:javascript
复制
class StorageQueue
    {
        private TaskCompletionSource<Sample> tcs;
        private object @lock = new object();

        private Queue<Sample> queue;

        public static StorageQueue CreateQueue(int?size=null)
        {
            return new StorageQueue(size);
        }

        public StorageQueue(int ?size)
        {
            if (size.HasValue)
            {
                this.queue = new Queue<Sample>(size.Value);
            }
            else
                this.queue = new Queue<Sample>();
        }

        public void Enqueue(Sample value)
        {
            lock (this.@lock)
            {
                this.queue.Enqueue(value);
                tcs = new TaskCompletionSource<Sample>();
                tcs.SetResult(this.queue.Peek());
            }
        }
        public async Task<Sample> DequeueAsync()
        {
            var result=await this.tcs.Task;
            this.queue.Dequeue();
            return result;
        }
    }

Consumer

代码语言:javascript
复制
class Consumer
    {
        private StorageQueue source;
        public Consumer(StorageQueue _source)
        {
            this.source = _source;
        }
        public async Task ConsumeAsync()
        {
            while (true)
            {
                Sample arrivedSample = await this.source.DequeueAsync();  //how should i implement this ?
                Console.WriteLine("Just Arrived");
            }
        }
    }

正如您在Storage's methodDequeuein aTaskso that i canawaitit in myConsumer. The only reason i usedTaskCompletionSourceis to be able to communicate between theDequeueandEnqueuemethods in theStorage`.类中看到的,我想包装我的Consumer

我不知道是否需要重新初始化tcs,但我想是这样做的,因为在每个Enqueue操作之后,我都想要一个新的Task

我还在tcs中重新初始化了lock,因为我希望这个特定的实例设置结果。

我该怎么做呢?实现还行吗?System.Reactive会提供更好的选择吗?

EN

回答 1

Stack Overflow用户

发布于 2018-12-30 10:39:39

我想我在你的实施中看到了几个问题:

  1. 如果您多次调用Enqueue()而不调用DequeueAsync(),那么您将丢失TaskCompletionSource,并且只有最后一个。然后,在第一次调用之后,调用DequeueAsync()将不会产生正确的结果。

要解决这个问题,您需要一个TaskCompletionSource队列来解决这个问题。看看这里和进一步的这里

更好的方法是,在队列为空时,在DequeueAsync中使用DequeueAsync来正确等待。

  1. 您也不会在DequeueAsync()中保护您的队列。
票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/53968202

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档