首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >访问者模式/漏桶变体实现,以在一定时间间隔内运行操作

访问者模式/漏桶变体实现,以在一定时间间隔内运行操作
EN

Code Review用户
提问于 2016-09-27 17:32:36
回答 3查看 704关注 0票数 7

我的代码是访问者模式的变体和“漏桶”变体。目标非常简单:“桶”将收集指定数量的项(例如,500项),然后清空队列,对每个项(“访问”)运行一些CPU绑定操作。(是的,我知道可能有一些与我所做的类似的现有代码-这段代码实际上并不是一个新颖的想法)。

我以前读过关于如何实现访问者模式的文档,但这是我第一次真正实现访问者模式。下面我有一种“有效”的方法来实现这一点吗?另外,我是否正确地实现了并发/线程安全?我认为它应该没有比赛条件,但也有可能我可以更简单地实现这一点。

代码语言:javascript
复制
/// <summary>
/// "Overflowing bucket" implementation
/// </summary>
/// <typeparam name="T">Type of the data structure we're operating on</typeparam>
/// <typeparam name="U">Type of the items in the data structure</typeparam>
/// <remarks>
/// The basic idea of this data structure is that it'll collect a certain number of items and then empty the queue.
/// 
/// The "overflowing bucket" metaphor isn't perfect because every time that the "bucket" is "filled to the brim" or starts
/// to "overflow" we just empty the whole thing.
/// </remarks>
public class OverflowingBucket<T, U>
{
    #region Fields
    // Not volatile - this will only be accessed by the background thread
    private readonly T itemToActOn;

    // Not volatile - this will only be accessed by the background thread
    private readonly Action<T, U> visitorOperation;

    // This action runs after we empty the queue
    private readonly Action<T> afterAction;

    // Must be concurrent because we could add to the queue while an operation is in place
    private readonly ConcurrentQueue<U> queue;

    // Will be accessed by multiple threads
    private volatile bool inProgress = false;

    // Obviously used for locking
    private readonly object lockObj = new object();
    #endregion Fields

    #region Constructor
    /// <summary>
    /// Create a new Overflowing Bucket
    /// </summary>
    /// <param name="itemToActOn">Item that is "visited." Required.</param>
    /// <param name="visitorOperation">Visitor operation. Required.</param>
    /// <param name="afterAction">Action that occurs after each time that the queue is emptied. Optional.</param>
    /// <param name="leakyNumber">Number of items that the queue may contain before the bucket starts "leaking."</param>
    public OverflowingBucket(T itemToActOn, Action<T, U> visitorOperation, Action<T> afterAction, int leakyNumber = 50)
    {
        this.itemToActOn = itemToActOn;
        this.visitorOperation = visitorOperation;
        this.afterAction = afterAction;
        queue = new ConcurrentQueue<U>();
        LeakyNumber = leakyNumber;
    }
    #endregion Constructor

    #region Properties
    /// <summary>
    /// Get or set the number at which we run an iteration
    /// </summary>
    public int LeakyNumber
    {
        get;
        set;
    }

    /// <summary>
    /// Get a value indicating whether a "visit" is already in progress (i.e. we are actively emptying the queue)
    /// </summary>
    public bool InProgress
    {
        get { return inProgress; }
    }
    #endregion Properties

    #region Public Methods
    /// <summary>
    /// Add an item, emptying the queue if necessary
    /// </summary>
    /// <param name="item">Item to add to the "bucket"</param>
    /// <returns>Handle to await the outcome</returns>
    public async Task Add(U item)
    {
        queue.Enqueue(item);

        if (queue.Count >= LeakyNumber && !inProgress)
        {
            await Visit();
        }
    }

    /// <summary>
    /// Visit the data structure. This will ignore successive calls (i.e. calls that occur while a previous"visit" operation is running).
    /// </summary>
    /// <returns>Handle to await the boolean result; result is <c>true</c> if we successfully ran the
    /// operation and <c>false</c> if the operation failed or never ran</returns>
    public async Task<bool> Visit()
    {
        // If there's already an operation in progress, ignore the call
        if (!inProgress)
        {
            bool result = true;
            await Task.Run(
                delegate ()
                {
                    lock (lockObj)
                    {
                        // "Double check" pattern to prevent race conditions
                        if (!inProgress)
                        {
                            inProgress = true;

                            // Completely empty the queue
                            while (queue.Any())
                            {
                                U item;

                                bool success = queue.TryDequeue(out item);

                                if (success) visitorOperation(itemToActOn, item);
                                else result = false;
                            }

                            // The "afterAction" operation is optional, so only
                            afterAction?.Invoke(itemToActOn);

                            inProgress = false;
                        } // End double-check condition
                    } // Release lock
                }); // End Task

            return result;
        } // End if inProgress

        // TODO: Should we throw an exception if we're already Visiting?
        return false;
    } // End Visit method
    #endregion Public Methods
} // End OverflowingBucket class

这里有一个代码示例,我将其称为此代码(以便更好地了解我所要做的事情)。

这可能不是一个很好的例子,因为所讨论的操作并不是特定的CPU限制(这显然是Task.Run操作的主要目的),但它仍然应该给出我想要做的事情的想法。

代码语言:javascript
复制
private static void TestOverflowingBucket()
    {
        var doc = new XmlDocument();
        doc.LoadXml("<QATestLog></QATestLog>");

        Action<XmlDocument, XmlElement> operation =
            delegate (XmlDocument document, XmlElement element)
            {
                document.DocumentElement.AppendChild(element);
            };

        Action<XmlDocument> afterOperation = xmlDoc => xmlDoc.Save("TestDocument.xml");

        var bucket = new OverflowingBucket<XmlDocument, XmlElement>(doc, operation, afterOperation);

        // AsyncPump is to correct some "oddities" in the way that async/await works in a console application
        // Source: https://blogs.msdn.microsoft.com/pfxteam/2012/01/20/await-synchronizationcontext-and-console-apps/
        AsyncPump.Run(async () =>
        {
            List<Task> tasks = new List<Task>();

            Random random = new Random();

            for (int i = 0; i < 1003; i++)
            {
                XmlElement newElement = doc.CreateElement("step");

                XmlAttribute newAttribute = doc.CreateAttribute("number");
                newAttribute.Value = i.ToString();

                newElement.Attributes.Append(newAttribute);

                // Don't await, it's OK for us to continue to add to the queue while an execution is happening
                // This will generate a compiler warning since Add is async and we don't await
                Task task = bucket.Add(newElement);

                // For added realism, wait some random time between adds
                await Task.Delay(random.Next(1, 1000));

                if (!task.IsCompleted)
                    tasks.Add(task);
            }

            if (bucket.InProgress)
            {
                Task.WaitAll(tasks.ToArray());
            }
            else
            {
                // If there are any "left over" that weren't "picked up" by a previous Visit execution
                await bucket.Visit();
            }
        });
    }
EN

回答 3

Code Review用户

回答已采纳

发布于 2016-09-28 06:00:56

首先,IMHO代码写得很好,并有一些有意义的评论。

访问者模式

实际上,我不确定OverflowingBucket是否与访客模式有关。在我看来,访问者模式可以通过实现访问者将逻辑添加到数据结构中。访问者访问数据结构的每个元素,并根据元素的类型决定该做什么。

OverflowingBucket可用于向对象添加逻辑,以便以特殊方式处理其他项。所以我不会在这里用“访客模式”这个词。

OverflowingBucket

  • 因为它不是我眼中的访问者,所以我会将方法Visit重命名为Process或类似的东西。
  • 考虑将方法Process设置为私有,并添加另一个方法ProcessPendingItems或类似的方法。
  • Visit返回false的唯一情况是,如果queue.Any()为true,而queue.TryDequeue(out item)为false。在现在的课堂上不应该发生这种事.因此,我会使这个方法无效。

TestOverflowingBucket

  • 延迟不是延迟添加任务,而是延迟主线程。在操作委托中添加Thread.Sleep()将更加现实。
  • 没有必要检查任务是否完成。您可以将它们全部添加到tasks列表中。
票数 6
EN

Code Review用户

发布于 2016-09-28 11:54:56

谈到async/await,我更喜欢简单。await很有用,当您拥有在Task完成后执行的代码时,代码就会变得更简洁。使用它只是为了沿着方法传递任务(如在public async Task Add(U item)中),就会产生噪音。

public async Task<bool> Visit()中,您只使用await等待任务完成,以便返回其结果。我宁愿让委托返回一个bool,而不是捕获result并修改它,然后我只返回Task.RunTask.FromResult在另一个分支中创建的任务。

票数 2
EN

Code Review用户

发布于 2016-09-28 06:55:47

我基本上同意JanDotNet的观点。

我不确定这是否是个好主意,但是也许OverflowingBucket应该把自己清理成一个IDisposable呢?

票数 0
EN
页面原文内容由Code Review提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://codereview.stackexchange.com/questions/142629

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档