首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >Redis消息队列(云)

Redis消息队列(云)
EN

Code Review用户
提问于 2015-09-24 01:12:10
回答 1查看 4.3K关注 0票数 7

我正在编写一个云应用程序(主要是业余爱好,学习),它需要一个非常快的缓存、队列和消息传递系统。我看过微软的几种不同的选择(托管是在Azure上),而且看起来都很慢(满足我的相对需求)。然后我撞上了Redis,速度正好在我需要它的地方。

我在使用这个之前的其他想法是,我也想将我对组件的使用降到最低,以防我需要从Azure迁移到裸金属,等等,我总是可以主持我自己的Redis。

为了学习和运动,我决定写一个队列系统,它可以作为一个AtMostOnceAtLeastOnce工作,在系统故障的情况下是可靠的。这个类还应该能够在多台机器上运行(在本例中是workerroles),并且可以由IoC或手动运行。

以下是到目前为止,在解决一些尚未实现的问题(例如,cancellationTokens,共享ConnectionMultiplexer )之前,我所做的事情。下面的代码确实可以工作,因为我已经在3个不同的WorkerRoles实例上测试了它,同时也测试了崩溃和重新启动。我的担忧更多的是针对我看不到的问题,性能问题,以及我缺乏一般经验。如果我做错了什么,请随时告诉我,但请注意,我知道已经有包了。我只是喜欢自己做事。

代码语言:javascript
复制
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Threading.Tasks;
using StackExchange.Redis;

namespace CitySurvival.WorkerCommon
{
    /// <summary>
    /// Needed: (2) Redis queues 1 for new messages, 1 for currently processing messages
    /// Needed: processing messages list is FILO
    /// 
    /// The queues will only contain the key to the message in redis, which is stored as
    /// a single entity for quick lookup
    /// 
    /// jobQueue  -- processingQueue
    /// job:1        job:2
    /// 
    /// job:1 (job to do index 1)
    /// job:2 (job to do index 2)
    /// 
    /// Finish method, will LREM key, and Remove Key from database
    /// 
    /// ON adding a new job, send a Publish to say a new job is added
    /// 
    /// ON taking a job, RPOPLPUSH from jobQueue to processingQueue
    /// 
    /// Checking for failed jobs, experation time 10 seconds (this should be long enough 
    /// to process anything)
    /// If job stays in processingQueue for longer than timeout, RPOPLPUSH to jobQueue
    /// 
    /// TODO: cancellationTokens (being in with autofac for global token or use Factory param)
    /// TODO: Get ConnectionMultiplexer from Constructor, or Factory
    /// </summary>
    public class RedisJobQueue
    {
        public delegate RedisJobQueue Factory(string jobName);
        private IConnectionMultiplexer ConnectionMultiplexer => _lazyConnection.Value;

        private readonly Lazy<IConnectionMultiplexer> _lazyConnection = new Lazy<IConnectionMultiplexer>(() => StackExchange.Redis.ConnectionMultiplexer.Connect("ConnctionString"));

        private readonly string _jobQueue;
        private readonly string _processingQueue;
        private readonly string _subChannel;
        private readonly string _jobName;

        private Task _managementTask;

        private bool _receiving;

        public event EventHandler<JobReceivedEventArgs> OnJobReceived; 

        public RedisJobQueue(/*ConnectionMultiplexer multiplexer, */string jobName)
        {
            //_connectionMultiplexer = multiplexer;
            _jobQueue = jobName + ":jobs";
            _processingQueue = jobName + ":process";
            _subChannel = jobName + ":channel";
            _jobName = jobName;
        }



        private IDatabase Database => ConnectionMultiplexer.GetDatabase();

        /// <summary>
        /// When a job is finished, remove it from the processingQueue and from the
        /// cache database.
        /// </summary>
        /// <param name="key"></param>
        /// <param name="failed">Operation failed, requeue for another attempt</param>
        public async Task Finish(string key, bool failed = false)
        {
            var db = Database;
            await db.ListRemoveAsync(_processingQueue, key);

            if (failed)
            {
                // How many times to fail before dead
                if (await db.HashExistsAsync(key, "failedcount"))
                {
                    var count = await db.HashGetAsync(key, "failedcount");
                    if (count.IsInteger)
                    {
                        if ((int) count >= 10)
                        {
                            // for now, delete the key, later we might integrate a dead message
                            // queue
                            await db.KeyDeleteAsync(key);
                            return;
                        }
                    }
                }

                db.HashIncrement(key, "failedcount");
                db.HashDelete(key, "active");
                db.ListRightPush(_jobQueue, key);

                ConnectionMultiplexer.GetSubscriber().Publish(_subChannel, "");
            }
            else
            {
                // Job was successfully run, remove the key
                await db.KeyDeleteAsync(key);
            }
        }

        /// <summary>
        /// Do we consume messages from the queue
        /// </summary>
        /// <returns></returns>
        public RedisJobQueue AsConsumer()
        {
            var sub = ConnectionMultiplexer.GetSubscriber();
            sub.Subscribe(_subChannel, (channel, value) => HandleNewJobs());

            // Assume on starting that we have jobs waiting to be handled
            HandleNewJobs();
            return this;
        }

        /// <summary>
        /// Runs a Task every 10 seconds to see if any remaining items are in
        /// processing queue
        /// </summary>
        /// <returns></returns>
        public RedisJobQueue AsManager()
        {
            _managementTask = Task.Factory.StartNew(async () =>
            {
                while (true)
                {
                    await Task.Delay(10000);
                    var timeToKill = (DateTime.UtcNow - new DateTime(1970, 1, 1)).TotalMilliseconds - 10000;
                    RedisValue[] values = Database.ListRange(_processingQueue);
                    foreach (var value in from value in values let activeTime = (double)Database.HashGet((string)value, "active") where activeTime < timeToKill select value)
                    {
                        await Finish(value, true);
                    }
                }
            });

            return this;
        }

        /// <summary>
        /// Move key from JobQueue to processingQueue, get key value from cache.
        /// 
        /// Also set the active field. Indicates when job was retrieved so we can monitor
        /// its time.
        /// </summary>
        /// <returns></returns>
        private Dictionary<RedisValue, RedisValue> GetJob()
        {
            Dictionary<RedisValue, RedisValue> value;
            while (true)
            {
                string key = Database.ListRightPopLeftPush(_jobQueue, _processingQueue);
                // If key is null, then nothing was there to get, so no value is available
                if (string.IsNullOrEmpty(key))
                {
                    value = new Dictionary<RedisValue, RedisValue>();
                    break;
                }

                Database.HashSet(key, "active", (DateTime.UtcNow - new DateTime(1970, 1, 1)).TotalMilliseconds);
                value = Database.HashGetAll(key).ToDictionary();

                if (value.Count == 0)
                {
                    Database.ListRemove(_processingQueue, key);
                    continue;
                }
                value.Add("key", key);
                break;
            }
            return value;
        }

        /// <summary>
        /// Move key from JobQueue to processingQueue, get key value from cache.
        /// 
        /// Also set the active field. Indicates when job was retrieved so we can monitor
        /// its time.
        /// </summary>
        /// <returns></returns>
        private async Task<Dictionary<RedisValue, RedisValue>> GetJobAsync()
        {
            var db = Database;
            Dictionary<RedisValue, RedisValue> value;
            while (true)
            {
                string key = await db.ListRightPopLeftPushAsync(_jobQueue, _processingQueue);
                // If key is null, then nothing was there to get, so no value is available
                if (string.IsNullOrEmpty(key))
                {
                    value = new Dictionary<RedisValue, RedisValue>();
                    break;
                }

                await db.HashSetAsync(key, "active", (DateTime.UtcNow - new DateTime(1970, 1, 1)).TotalMilliseconds);
                value = (await db.HashGetAllAsync(key)).ToDictionary();

                // if Count is 0, remove it and check for the next job
                if (value.Count == 0)
                {
                    await db.ListRemoveAsync(_processingQueue, key);
                    continue;
                }

                value.Add("key", key);

                break;
            }
            return value;
        }

        /// <summary>
        /// We have received an indicator that new jobs are available
        /// We process until we are out of jobs.
        /// </summary>
        private async void HandleNewJobs()
        {
            if (_receiving)
            {
                Trace.WriteLine("Already Receiving Jobs...");
                return;
            }
            _receiving = true;
            Trace.WriteLine("Trying to get jobs...");
            var job = await GetJobAsync();
            // If a valid job cannot be found, it will return an empty Dictionary
            while (job.Count != 0)
            {
                // Fire the Event
                OnJobReceived?.Invoke(this, new JobReceivedEventArgs(job, job["key"]));
                // Get a new job if there is one
                job = await GetJobAsync();
            }
            _receiving = false;
        }

        /// <summary>
        /// Add a job to the Queue
        /// </summary>
        /// <param name="job"></param>
        public void AddJob(RedisValue job)
        {
            if (job.IsNullOrEmpty) return;

            var id = Database.StringIncrement(_jobName + ":jobid");
            var key = _jobName + ":" + id;
            Database.HashSet(key, "payload", job);
            Database.ListLeftPush(_jobQueue, key);
            ConnectionMultiplexer.GetSubscriber().Publish(_subChannel, "");
            Trace.WriteLine("Added Job");
        }

        /// <summary>
        /// Add a job to the Queue (async)
        /// </summary>
        /// <param name="job"></param>
        public async Task AddJobAsync(RedisValue job)
        {
            if (job.IsNullOrEmpty) return;

            var id = await Database.StringIncrementAsync(_jobName + ":jobid");
            var key = _jobName + ":" + id;
            await Database.HashSetAsync(key, "payload", job);
            await Database.ListLeftPushAsync(_jobQueue, key);
            await ConnectionMultiplexer.GetSubscriber().PublishAsync(_subChannel, "");
            Trace.WriteLine("Added Job");
        }
    }
}

新码

我从原始代码中添加和编辑了一些条目,也不相关地列出了下面的一些更改。新代码可在此gist 红宝石作业/消息队列的GitHub Gist中的Github上找到。

EN

回答 1

Code Review用户

发布于 2015-09-24 08:29:58

AsManager中,您将启动一个长期运行的任务--您应该将TaskCreationOptions.LongRunning传递给StartNew,以帮助框架做出更好的决策。

而且,任务在while (true)循环中运行,没有明显的救助条件。如果您将一个CancellationToken传递给它,那么它可能会更干净一些,这样您就可以取消任务,并在关闭时结束它。

票数 2
EN
页面原文内容由Code Review提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://codereview.stackexchange.com/questions/105560

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档