首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >等待BlockingCollection完成

等待BlockingCollection完成
EN

Stack Overflow用户
提问于 2021-06-23 22:22:20
回答 1查看 71关注 0票数 0

对于多线程应用程序,我想执行await操作,直到BlockingCollection完成并为空(IsCompleted = true)。我实现了下面的代码,这似乎起作用了。

因为它是多线程的,我甚至不相信我自己的影子。这是一个健壮的实现吗?

代码语言:javascript
复制
public class BlockingCollectionEx<T> : BlockingCollection<T>
{
    public Task WaitCompleted => completedManualResetEvent.Task;
    private readonly TaskCompletionSource completedManualResetEvent = new();

    public new void CompleteAdding()
    {
        base.CompleteAdding();

        lock (completedManualResetEvent)
        {
            if (base.Count == 0 && !completedManualResetEvent.Task.IsCompleted)
                completedManualResetEvent.SetResult();
        }
    }

    public new IEnumerable<T> GetConsumingEnumerable()
    {
        foreach (var item in base.GetConsumingEnumerable())
            yield return item;

        lock (completedManualResetEvent) //if GetConsumingEnumerable is used by multiple threads, the 2nd one would throw an InvalidOperationException 
        {
            if (!completedManualResetEvent.Task.IsCompleted)
                completedManualResetEvent.SetResult();
        }
    }
    public new IEnumerable<T> GetConsumingEnumerable(CancellationToken cancellationToken) => throw new NotImplementedException();

    public new T Take() => throw new NotImplementedException();
    public new T Take(CancellationToken cancellationToken) => throw new NotImplementedException();

    public new bool TryTake([MaybeNullWhen(false)] out T item) => throw new NotImplementedException();
    public new bool TryTake([MaybeNullWhen(false)] out T item, int millisecondsTimeout) => throw new NotImplementedException();
    public new bool TryTake([MaybeNullWhen(false)] out T item, int millisecondsTimeout, CancellationToken cancellationToken) => throw new NotImplementedException();
    public new bool TryTake([MaybeNullWhen(false)] out T item, TimeSpan timeout) => throw new NotImplementedException();
}

用法:

代码语言:javascript
复制
var x = new BlockingCollectionEx<int> { 1, 2, 3 };
x.CompleteAdding();

Task.Run(() =>
{
    foreach (var item in x.GetConsumingEnumerable())
        // do stuff in Task 1
});
Task.Run(() =>
{
    foreach (var item in x.GetConsumingEnumerable())
        // do stuff in Task 2
});

await x.WaitCompleted;
Debug.Assert(x.IsCompleted);
// do stuff since the collection is emtpy
EN

回答 1

Stack Overflow用户

发布于 2021-06-23 23:46:33

您的实现对于一般用途来说并不健壮,但对于遵守以下合同的应用程序来说可能已经足够好了:

该集合只能由一个使用者使用,该使用者只使用GetConsumingEnumerable方法使用它。

  1. 如果没有使用者,集合为空,并且调用了CompleteAdding方法,则WaitCompleted任务永远不会完成。
  2. 如果有两个或更多使用者,枚举将失败,除一个使用者外,所有使用者都将出现InvalidOperationException。如果有一个使用者,但通过使用TakeTryTake方法使用集合,则<代码>H215<代码>G216任务将永远不会完成。

如果不了解您的特定用例,我不能说您是否有合法的理由请求此功能。不过,一般来说,等待BlockingCollection<T>变为空并完成的确切时刻通常并不重要。重要的是所有使用项的处理完成的确切时刻,这发生在集合完成之后。

注意:此答案针对此问题的Revision 1版本。

票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/68101714

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档