对于多线程应用程序,我想执行await操作,直到BlockingCollection完成并为空(IsCompleted = true)。我实现了下面的代码,这似乎起作用了。
因为它是多线程的,我甚至不相信我自己的影子。这是一个健壮的实现吗?
public class BlockingCollectionEx<T> : BlockingCollection<T>
{
public Task WaitCompleted => completedManualResetEvent.Task;
private readonly TaskCompletionSource completedManualResetEvent = new();
public new void CompleteAdding()
{
base.CompleteAdding();
lock (completedManualResetEvent)
{
if (base.Count == 0 && !completedManualResetEvent.Task.IsCompleted)
completedManualResetEvent.SetResult();
}
}
public new IEnumerable<T> GetConsumingEnumerable()
{
foreach (var item in base.GetConsumingEnumerable())
yield return item;
lock (completedManualResetEvent) //if GetConsumingEnumerable is used by multiple threads, the 2nd one would throw an InvalidOperationException
{
if (!completedManualResetEvent.Task.IsCompleted)
completedManualResetEvent.SetResult();
}
}
public new IEnumerable<T> GetConsumingEnumerable(CancellationToken cancellationToken) => throw new NotImplementedException();
public new T Take() => throw new NotImplementedException();
public new T Take(CancellationToken cancellationToken) => throw new NotImplementedException();
public new bool TryTake([MaybeNullWhen(false)] out T item) => throw new NotImplementedException();
public new bool TryTake([MaybeNullWhen(false)] out T item, int millisecondsTimeout) => throw new NotImplementedException();
public new bool TryTake([MaybeNullWhen(false)] out T item, int millisecondsTimeout, CancellationToken cancellationToken) => throw new NotImplementedException();
public new bool TryTake([MaybeNullWhen(false)] out T item, TimeSpan timeout) => throw new NotImplementedException();
}用法:
var x = new BlockingCollectionEx<int> { 1, 2, 3 };
x.CompleteAdding();
Task.Run(() =>
{
foreach (var item in x.GetConsumingEnumerable())
// do stuff in Task 1
});
Task.Run(() =>
{
foreach (var item in x.GetConsumingEnumerable())
// do stuff in Task 2
});
await x.WaitCompleted;
Debug.Assert(x.IsCompleted);
// do stuff since the collection is emtpy发布于 2021-06-23 23:46:33
您的实现对于一般用途来说并不健壮,但对于遵守以下合同的应用程序来说可能已经足够好了:
该集合只能由一个使用者使用,该使用者只使用
GetConsumingEnumerable方法使用它。
CompleteAdding方法,则WaitCompleted任务永远不会完成。InvalidOperationException。如果有一个使用者,但通过使用Take或TryTake方法使用集合,则<代码>H215<代码>G216任务将永远不会完成。如果不了解您的特定用例,我不能说您是否有合法的理由请求此功能。不过,一般来说,等待BlockingCollection<T>变为空并完成的确切时刻通常并不重要。重要的是所有使用项的处理完成的确切时刻,这发生在集合完成之后。
注意:此答案针对此问题的Revision 1版本。
https://stackoverflow.com/questions/68101714
复制相似问题