我的代码是访问者模式的变体和“漏桶”变体。目标非常简单:“桶”将收集指定数量的项(例如,500项),然后清空队列,对每个项(“访问”)运行一些CPU绑定操作。(是的,我知道可能有一些与我所做的类似的现有代码-这段代码实际上并不是一个新颖的想法)。
我以前读过关于如何实现访问者模式的文档,但这是我第一次真正实现访问者模式。下面我有一种“有效”的方法来实现这一点吗?另外,我是否正确地实现了并发/线程安全?我认为它应该没有比赛条件,但也有可能我可以更简单地实现这一点。
/// <summary>
/// "Overflowing bucket" implementation
/// </summary>
/// <typeparam name="T">Type of the data structure we're operating on</typeparam>
/// <typeparam name="U">Type of the items in the data structure</typeparam>
/// <remarks>
/// The basic idea of this data structure is that it'll collect a certain number of items and then empty the queue.
///
/// The "overflowing bucket" metaphor isn't perfect because every time that the "bucket" is "filled to the brim" or starts
/// to "overflow" we just empty the whole thing.
/// </remarks>
public class OverflowingBucket<T, U>
{
#region Fields
// Not volatile - this will only be accessed by the background thread
private readonly T itemToActOn;
// Not volatile - this will only be accessed by the background thread
private readonly Action<T, U> visitorOperation;
// This action runs after we empty the queue
private readonly Action<T> afterAction;
// Must be concurrent because we could add to the queue while an operation is in place
private readonly ConcurrentQueue<U> queue;
// Will be accessed by multiple threads
private volatile bool inProgress = false;
// Obviously used for locking
private readonly object lockObj = new object();
#endregion Fields
#region Constructor
/// <summary>
/// Create a new Overflowing Bucket
/// </summary>
/// <param name="itemToActOn">Item that is "visited." Required.</param>
/// <param name="visitorOperation">Visitor operation. Required.</param>
/// <param name="afterAction">Action that occurs after each time that the queue is emptied. Optional.</param>
/// <param name="leakyNumber">Number of items that the queue may contain before the bucket starts "leaking."</param>
public OverflowingBucket(T itemToActOn, Action<T, U> visitorOperation, Action<T> afterAction, int leakyNumber = 50)
{
this.itemToActOn = itemToActOn;
this.visitorOperation = visitorOperation;
this.afterAction = afterAction;
queue = new ConcurrentQueue<U>();
LeakyNumber = leakyNumber;
}
#endregion Constructor
#region Properties
/// <summary>
/// Get or set the number at which we run an iteration
/// </summary>
public int LeakyNumber
{
get;
set;
}
/// <summary>
/// Get a value indicating whether a "visit" is already in progress (i.e. we are actively emptying the queue)
/// </summary>
public bool InProgress
{
get { return inProgress; }
}
#endregion Properties
#region Public Methods
/// <summary>
/// Add an item, emptying the queue if necessary
/// </summary>
/// <param name="item">Item to add to the "bucket"</param>
/// <returns>Handle to await the outcome</returns>
public async Task Add(U item)
{
queue.Enqueue(item);
if (queue.Count >= LeakyNumber && !inProgress)
{
await Visit();
}
}
/// <summary>
/// Visit the data structure. This will ignore successive calls (i.e. calls that occur while a previous"visit" operation is running).
/// </summary>
/// <returns>Handle to await the boolean result; result is <c>true</c> if we successfully ran the
/// operation and <c>false</c> if the operation failed or never ran</returns>
public async Task<bool> Visit()
{
// If there's already an operation in progress, ignore the call
if (!inProgress)
{
bool result = true;
await Task.Run(
delegate ()
{
lock (lockObj)
{
// "Double check" pattern to prevent race conditions
if (!inProgress)
{
inProgress = true;
// Completely empty the queue
while (queue.Any())
{
U item;
bool success = queue.TryDequeue(out item);
if (success) visitorOperation(itemToActOn, item);
else result = false;
}
// The "afterAction" operation is optional, so only
afterAction?.Invoke(itemToActOn);
inProgress = false;
} // End double-check condition
} // Release lock
}); // End Task
return result;
} // End if inProgress
// TODO: Should we throw an exception if we're already Visiting?
return false;
} // End Visit method
#endregion Public Methods
} // End OverflowingBucket class这里有一个代码示例,我将其称为此代码(以便更好地了解我所要做的事情)。
这可能不是一个很好的例子,因为所讨论的操作并不是特定的CPU限制(这显然是Task.Run操作的主要目的),但它仍然应该给出我想要做的事情的想法。
private static void TestOverflowingBucket()
{
var doc = new XmlDocument();
doc.LoadXml("<QATestLog></QATestLog>");
Action<XmlDocument, XmlElement> operation =
delegate (XmlDocument document, XmlElement element)
{
document.DocumentElement.AppendChild(element);
};
Action<XmlDocument> afterOperation = xmlDoc => xmlDoc.Save("TestDocument.xml");
var bucket = new OverflowingBucket<XmlDocument, XmlElement>(doc, operation, afterOperation);
// AsyncPump is to correct some "oddities" in the way that async/await works in a console application
// Source: https://blogs.msdn.microsoft.com/pfxteam/2012/01/20/await-synchronizationcontext-and-console-apps/
AsyncPump.Run(async () =>
{
List<Task> tasks = new List<Task>();
Random random = new Random();
for (int i = 0; i < 1003; i++)
{
XmlElement newElement = doc.CreateElement("step");
XmlAttribute newAttribute = doc.CreateAttribute("number");
newAttribute.Value = i.ToString();
newElement.Attributes.Append(newAttribute);
// Don't await, it's OK for us to continue to add to the queue while an execution is happening
// This will generate a compiler warning since Add is async and we don't await
Task task = bucket.Add(newElement);
// For added realism, wait some random time between adds
await Task.Delay(random.Next(1, 1000));
if (!task.IsCompleted)
tasks.Add(task);
}
if (bucket.InProgress)
{
Task.WaitAll(tasks.ToArray());
}
else
{
// If there are any "left over" that weren't "picked up" by a previous Visit execution
await bucket.Visit();
}
});
}发布于 2016-09-28 06:00:56
首先,IMHO代码写得很好,并有一些有意义的评论。
实际上,我不确定OverflowingBucket是否与访客模式有关。在我看来,访问者模式可以通过实现访问者将逻辑添加到数据结构中。访问者访问数据结构的每个元素,并根据元素的类型决定该做什么。
OverflowingBucket可用于向对象添加逻辑,以便以特殊方式处理其他项。所以我不会在这里用“访客模式”这个词。
Visit重命名为Process或类似的东西。Process设置为私有,并添加另一个方法ProcessPendingItems或类似的方法。Visit返回false的唯一情况是,如果queue.Any()为true,而queue.TryDequeue(out item)为false。在现在的课堂上不应该发生这种事.因此,我会使这个方法无效。Thread.Sleep()将更加现实。tasks列表中。发布于 2016-09-28 11:54:56
谈到async/await,我更喜欢简单。await很有用,当您拥有在Task完成后执行的代码时,代码就会变得更简洁。使用它只是为了沿着方法传递任务(如在public async Task Add(U item)中),就会产生噪音。
在public async Task<bool> Visit()中,您只使用await等待任务完成,以便返回其结果。我宁愿让委托返回一个bool,而不是捕获result并修改它,然后我只返回Task.Run或Task.FromResult在另一个分支中创建的任务。
发布于 2016-09-28 06:55:47
我基本上同意JanDotNet的观点。
我不确定这是否是个好主意,但是也许OverflowingBucket应该把自己清理成一个IDisposable呢?
https://codereview.stackexchange.com/questions/142629
复制相似问题