首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >基于可等待任务的队列

基于可等待任务的队列
EN

Stack Overflow用户
提问于 2011-10-23 08:36:40
回答 10查看 42.4K关注 0票数 47

我想知道是否存在类似于BlockingCollectionConcurrentQueue实现/包装器,在这种情况下,从集合中获取数据不会阻塞,而是异步的,并且会导致异步等待,直到将项放入队列。

我已经提出了我自己的实现,但它似乎没有像预期的那样执行。我在想,我是不是在重新创造已经存在的东西。

下面是我的实现:

代码语言:javascript
复制
public class MessageQueue<T>
{
    ConcurrentQueue<T> queue = new ConcurrentQueue<T>();

    ConcurrentQueue<TaskCompletionSource<T>> waitingQueue = 
        new ConcurrentQueue<TaskCompletionSource<T>>();

    object queueSyncLock = new object();

    public void Enqueue(T item)
    {
        queue.Enqueue(item);
        ProcessQueues();
    }

    public async Task<T> Dequeue()
    {
        TaskCompletionSource<T> tcs = new TaskCompletionSource<T>();
        waitingQueue.Enqueue(tcs);
        ProcessQueues();
        return tcs.Task.IsCompleted ? tcs.Task.Result : await tcs.Task;
    }

    private void ProcessQueues()
    {
        TaskCompletionSource<T> tcs=null;
        T firstItem=default(T);
        while (true)
        {
            bool ok;
            lock (queueSyncLock)
            {
                ok = waitingQueue.TryPeek(out tcs) && queue.TryPeek(out firstItem);
                if (ok)
                {
                    waitingQueue.TryDequeue(out tcs);
                    queue.TryDequeue(out firstItem);
                }
            }
            if (!ok) break;
            tcs.SetResult(firstItem);
        }
    }
}
EN

回答 10

Stack Overflow用户

回答已采纳

发布于 2011-10-24 02:40:08

我不知道有什么无锁的解决方案,但你可以看看新的Dataflow library,它是Async CTP的一部分。一个简单的BufferBlock<T>应该足够了,例如:

代码语言:javascript
复制
BufferBlock<int> buffer = new BufferBlock<int>();

生产和消费最容易通过数据流块类型上的扩展方法来完成。

生产很简单,如下所示:

代码语言:javascript
复制
buffer.Post(13);

而消费是异步就绪的:

代码语言:javascript
复制
int item = await buffer.ReceiveAsync();

如果可能的话,我建议您使用Dataflow;使这样的缓冲区既高效又正确比它最初看起来要困难得多。

票数 67
EN

Stack Overflow用户

发布于 2019-04-30 09:18:39

使用C# 8.0 IAsyncEnumerableDataflow library的简单方法

代码语言:javascript
复制
// Instatiate an async queue
var queue = new AsyncQueue<int>();

// Then, loop through the elements of queue.
// This loop won't stop until it is canceled or broken out of
// (for that, use queue.WithCancellation(..) or break;)
await foreach(int i in queue) {
    // Writes a line as soon as some other Task calls queue.Enqueue(..)
    Console.WriteLine(i);
}

AsyncQueue的实现如下:

代码语言:javascript
复制
public class AsyncQueue<T> : IAsyncEnumerable<T>
{
    private readonly SemaphoreSlim _enumerationSemaphore = new SemaphoreSlim(1);
    private readonly BufferBlock<T> _bufferBlock = new BufferBlock<T>();

    public void Enqueue(T item) =>
        _bufferBlock.Post(item);

    public async IAsyncEnumerator<T> GetAsyncEnumerator(CancellationToken token = default)
    {
        // We lock this so we only ever enumerate once at a time.
        // That way we ensure all items are returned in a continuous
        // fashion with no 'holes' in the data when two foreach compete.
        await _enumerationSemaphore.WaitAsync();
        try {
            // Return new elements until cancellationToken is triggered.
            while (true) {
                // Make sure to throw on cancellation so the Task will transfer into a canceled state
                token.ThrowIfCancellationRequested();
                yield return await _bufferBlock.ReceiveAsync(token);
            }
        } finally {
            _enumerationSemaphore.Release();
        }

    }
}
票数 32
EN

Stack Overflow用户

发布于 2021-04-10 02:40:23

现在有一种官方方法可以做到这一点:System.Threading.Channels。它内置于.NET Core3.0及更高版本(包括.NET 5.0和6.0)的核心运行时中,但在.NET标准2.0和2.1上也可以作为NuGet包使用。您可以通读文档here

代码语言:javascript
复制
var channel = System.Threading.Channels.Channel.CreateUnbounded<int>();

要将工作入队,请执行以下操作:

代码语言:javascript
复制
// This will succeed and finish synchronously if the channel is unbounded.
channel.Writer.TryWrite(42);

要完成通道,请执行以下操作:

代码语言:javascript
复制
channel.Writer.TryComplete();

要从通道读取,请执行以下操作:

代码语言:javascript
复制
var i = await channel.Reader.ReadAsync();

或者,如果您使用的是.NET Core3.0或更高版本:

代码语言:javascript
复制
await foreach (int i in channel.Reader.ReadAllAsync())
{
    // whatever processing on i...
}
票数 10
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/7863573

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档