我正在编写一个云应用程序(主要是业余爱好,学习),它需要一个非常快的缓存、队列和消息传递系统。我看过微软的几种不同的选择(托管是在Azure上),而且看起来都很慢(满足我的相对需求)。然后我撞上了Redis,速度正好在我需要它的地方。
我在使用这个之前的其他想法是,我也想将我对组件的使用降到最低,以防我需要从Azure迁移到裸金属,等等,我总是可以主持我自己的Redis。
为了学习和运动,我决定写一个队列系统,它可以作为一个AtMostOnce或AtLeastOnce工作,在系统故障的情况下是可靠的。这个类还应该能够在多台机器上运行(在本例中是workerroles),并且可以由IoC或手动运行。
以下是到目前为止,在解决一些尚未实现的问题(例如,cancellationTokens,共享ConnectionMultiplexer )之前,我所做的事情。下面的代码确实可以工作,因为我已经在3个不同的WorkerRoles实例上测试了它,同时也测试了崩溃和重新启动。我的担忧更多的是针对我看不到的问题,性能问题,以及我缺乏一般经验。如果我做错了什么,请随时告诉我,但请注意,我知道已经有包了。我只是喜欢自己做事。
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Threading.Tasks;
using StackExchange.Redis;
namespace CitySurvival.WorkerCommon
{
/// <summary>
/// Needed: (2) Redis queues 1 for new messages, 1 for currently processing messages
/// Needed: processing messages list is FILO
///
/// The queues will only contain the key to the message in redis, which is stored as
/// a single entity for quick lookup
///
/// jobQueue -- processingQueue
/// job:1 job:2
///
/// job:1 (job to do index 1)
/// job:2 (job to do index 2)
///
/// Finish method, will LREM key, and Remove Key from database
///
/// ON adding a new job, send a Publish to say a new job is added
///
/// ON taking a job, RPOPLPUSH from jobQueue to processingQueue
///
/// Checking for failed jobs, experation time 10 seconds (this should be long enough
/// to process anything)
/// If job stays in processingQueue for longer than timeout, RPOPLPUSH to jobQueue
///
/// TODO: cancellationTokens (being in with autofac for global token or use Factory param)
/// TODO: Get ConnectionMultiplexer from Constructor, or Factory
/// </summary>
public class RedisJobQueue
{
public delegate RedisJobQueue Factory(string jobName);
private IConnectionMultiplexer ConnectionMultiplexer => _lazyConnection.Value;
private readonly Lazy<IConnectionMultiplexer> _lazyConnection = new Lazy<IConnectionMultiplexer>(() => StackExchange.Redis.ConnectionMultiplexer.Connect("ConnctionString"));
private readonly string _jobQueue;
private readonly string _processingQueue;
private readonly string _subChannel;
private readonly string _jobName;
private Task _managementTask;
private bool _receiving;
public event EventHandler<JobReceivedEventArgs> OnJobReceived;
public RedisJobQueue(/*ConnectionMultiplexer multiplexer, */string jobName)
{
//_connectionMultiplexer = multiplexer;
_jobQueue = jobName + ":jobs";
_processingQueue = jobName + ":process";
_subChannel = jobName + ":channel";
_jobName = jobName;
}
private IDatabase Database => ConnectionMultiplexer.GetDatabase();
/// <summary>
/// When a job is finished, remove it from the processingQueue and from the
/// cache database.
/// </summary>
/// <param name="key"></param>
/// <param name="failed">Operation failed, requeue for another attempt</param>
public async Task Finish(string key, bool failed = false)
{
var db = Database;
await db.ListRemoveAsync(_processingQueue, key);
if (failed)
{
// How many times to fail before dead
if (await db.HashExistsAsync(key, "failedcount"))
{
var count = await db.HashGetAsync(key, "failedcount");
if (count.IsInteger)
{
if ((int) count >= 10)
{
// for now, delete the key, later we might integrate a dead message
// queue
await db.KeyDeleteAsync(key);
return;
}
}
}
db.HashIncrement(key, "failedcount");
db.HashDelete(key, "active");
db.ListRightPush(_jobQueue, key);
ConnectionMultiplexer.GetSubscriber().Publish(_subChannel, "");
}
else
{
// Job was successfully run, remove the key
await db.KeyDeleteAsync(key);
}
}
/// <summary>
/// Do we consume messages from the queue
/// </summary>
/// <returns></returns>
public RedisJobQueue AsConsumer()
{
var sub = ConnectionMultiplexer.GetSubscriber();
sub.Subscribe(_subChannel, (channel, value) => HandleNewJobs());
// Assume on starting that we have jobs waiting to be handled
HandleNewJobs();
return this;
}
/// <summary>
/// Runs a Task every 10 seconds to see if any remaining items are in
/// processing queue
/// </summary>
/// <returns></returns>
public RedisJobQueue AsManager()
{
_managementTask = Task.Factory.StartNew(async () =>
{
while (true)
{
await Task.Delay(10000);
var timeToKill = (DateTime.UtcNow - new DateTime(1970, 1, 1)).TotalMilliseconds - 10000;
RedisValue[] values = Database.ListRange(_processingQueue);
foreach (var value in from value in values let activeTime = (double)Database.HashGet((string)value, "active") where activeTime < timeToKill select value)
{
await Finish(value, true);
}
}
});
return this;
}
/// <summary>
/// Move key from JobQueue to processingQueue, get key value from cache.
///
/// Also set the active field. Indicates when job was retrieved so we can monitor
/// its time.
/// </summary>
/// <returns></returns>
private Dictionary<RedisValue, RedisValue> GetJob()
{
Dictionary<RedisValue, RedisValue> value;
while (true)
{
string key = Database.ListRightPopLeftPush(_jobQueue, _processingQueue);
// If key is null, then nothing was there to get, so no value is available
if (string.IsNullOrEmpty(key))
{
value = new Dictionary<RedisValue, RedisValue>();
break;
}
Database.HashSet(key, "active", (DateTime.UtcNow - new DateTime(1970, 1, 1)).TotalMilliseconds);
value = Database.HashGetAll(key).ToDictionary();
if (value.Count == 0)
{
Database.ListRemove(_processingQueue, key);
continue;
}
value.Add("key", key);
break;
}
return value;
}
/// <summary>
/// Move key from JobQueue to processingQueue, get key value from cache.
///
/// Also set the active field. Indicates when job was retrieved so we can monitor
/// its time.
/// </summary>
/// <returns></returns>
private async Task<Dictionary<RedisValue, RedisValue>> GetJobAsync()
{
var db = Database;
Dictionary<RedisValue, RedisValue> value;
while (true)
{
string key = await db.ListRightPopLeftPushAsync(_jobQueue, _processingQueue);
// If key is null, then nothing was there to get, so no value is available
if (string.IsNullOrEmpty(key))
{
value = new Dictionary<RedisValue, RedisValue>();
break;
}
await db.HashSetAsync(key, "active", (DateTime.UtcNow - new DateTime(1970, 1, 1)).TotalMilliseconds);
value = (await db.HashGetAllAsync(key)).ToDictionary();
// if Count is 0, remove it and check for the next job
if (value.Count == 0)
{
await db.ListRemoveAsync(_processingQueue, key);
continue;
}
value.Add("key", key);
break;
}
return value;
}
/// <summary>
/// We have received an indicator that new jobs are available
/// We process until we are out of jobs.
/// </summary>
private async void HandleNewJobs()
{
if (_receiving)
{
Trace.WriteLine("Already Receiving Jobs...");
return;
}
_receiving = true;
Trace.WriteLine("Trying to get jobs...");
var job = await GetJobAsync();
// If a valid job cannot be found, it will return an empty Dictionary
while (job.Count != 0)
{
// Fire the Event
OnJobReceived?.Invoke(this, new JobReceivedEventArgs(job, job["key"]));
// Get a new job if there is one
job = await GetJobAsync();
}
_receiving = false;
}
/// <summary>
/// Add a job to the Queue
/// </summary>
/// <param name="job"></param>
public void AddJob(RedisValue job)
{
if (job.IsNullOrEmpty) return;
var id = Database.StringIncrement(_jobName + ":jobid");
var key = _jobName + ":" + id;
Database.HashSet(key, "payload", job);
Database.ListLeftPush(_jobQueue, key);
ConnectionMultiplexer.GetSubscriber().Publish(_subChannel, "");
Trace.WriteLine("Added Job");
}
/// <summary>
/// Add a job to the Queue (async)
/// </summary>
/// <param name="job"></param>
public async Task AddJobAsync(RedisValue job)
{
if (job.IsNullOrEmpty) return;
var id = await Database.StringIncrementAsync(_jobName + ":jobid");
var key = _jobName + ":" + id;
await Database.HashSetAsync(key, "payload", job);
await Database.ListLeftPushAsync(_jobQueue, key);
await ConnectionMultiplexer.GetSubscriber().PublishAsync(_subChannel, "");
Trace.WriteLine("Added Job");
}
}
}我从原始代码中添加和编辑了一些条目,也不相关地列出了下面的一些更改。新代码可在此gist 红宝石作业/消息队列的GitHub Gist中的Github上找到。
发布于 2015-09-24 08:29:58
在AsManager中,您将启动一个长期运行的任务--您应该将TaskCreationOptions.LongRunning传递给StartNew,以帮助框架做出更好的决策。
而且,任务在while (true)循环中运行,没有明显的救助条件。如果您将一个CancellationToken传递给它,那么它可能会更干净一些,这样您就可以取消任务,并在关闭时结束它。
https://codereview.stackexchange.com/questions/105560
复制相似问题