我正在关注这篇在控制台应用程序上使用消息的文章(http://www.jarloo.com/listening-to-rabbitmq-events/),目前我关心的是当用户按下CTRL +C并退出应用程序时会发生什么。
作为最低限度,我希望它在退出程序之前完成对当前消息和ack的处理。我对如何实现这段代码感到困惑,因为我是RabbitMQ的新手。
我知道channel.BasicConsume(queueName,true,consumer);是阻塞线程。
任何帮助都将不胜感激。
发布于 2017-02-07 17:11:24
这是我设法实现的,但不确定这是最好的方法还是有改进。我没有使用cancellationToken,因为我还不确定我们是否可以使用它。
在我的控制台上,我得到了预期的结果(见截图)

public abstract class QueueConnection : IDisposable
{
internal IConnection _connection;
internal IModel _model;
internal IBasicProperties _properties;
internal RabbitMQSettings _settings;
internal protected object _modelLock = new Object();
public QueueConnection(IOptions<RabbitMQSettings> queueConfig)
{
_settings = queueConfig.Value;
}
internal bool CreateModel(string queueName)
{
if (string.IsNullOrEmpty(queueName))
{
throw new ArgumentException("The queue name has to be specified before.");
}
lock (_modelLock)
{
if (!IsConnected) Connect();
if (_model == null || _model.IsClosed)
{
_model = _connection.CreateModel();
// When AutoClose is true, the last channel to close will also cause the connection to close.
// If it is set to true before any channel is created, the connection will close then and there.
_connection.AutoClose = true;
// Configure the Quality of service for the model. Below is how what each setting means.
// BasicQos(0="Dont send me a new message untill I’ve finshed", 1= "Send me one message at a time", false ="Apply to this Model only")
_model.BasicQos(0, 50, false);
const bool durable = true, queueAutoDelete = false, exclusive = false;
_model.QueueDeclare(queueName, durable, exclusive, queueAutoDelete, null);
_properties = RabbitMQProperties.CreateDefaultProperties(_model);
}
}
return true;
}
public void Connect()
{
var connectionFactory = new ConnectionFactory
{
HostName = _settings.HostName,
UserName = _settings.UserName,
Password = _settings.Password,
};
if (_settings.Port.HasValue) connectionFactory.Port = _settings.Port.Value;
if (_settings.Heartbeat.HasValue) connectionFactory.RequestedHeartbeat = _settings.Heartbeat.Value;
if (!string.IsNullOrEmpty(_settings.VirtualHost)) connectionFactory.VirtualHost = _settings.VirtualHost;
_connection = connectionFactory.CreateConnection();
}
public bool IsConnected
{
get { return _connection != null && _connection.IsOpen; }
}
public object GetConnection()
{
return _connection;
}
public void Disconnect()
{
if (_connection != null) _connection.Dispose();
}
void IDisposable.Dispose()
{
Disconnect();
}
}QueueConsumer类
public class QueueConsumer : QueueConnection, IQueueConsumer
{
private EventingBasicConsumer consumer;
public QueueConsumer(IOptions<RabbitMQSettings> queueConfig)
:base(queueConfig) {}
public void ReadFromQueue(Action<string, ulong> onDequeue, Action<Exception, ulong> onError)
{
ReadFromQueue(onDequeue, onError, _settings.QueueName);
}
public void ReadFromQueue(Action<string, ulong> onDequeue, Action<Exception, ulong> onError, string queueName)
{
CreateModel(queueName);
consumer = new EventingBasicConsumer(_model);
// Receive the messages
consumer.Received += (o, e) =>
{
try
{
var queueMessage = Encoding.UTF8.GetString(e.Body);
onDequeue.Invoke(queueMessage, e.DeliveryTag);
}
catch (Exception ex)
{
onError.Invoke(ex, e.DeliveryTag);
}
};
// if the consumer shutdown reconnects to rabbitmq and begin reading from the queue again.
consumer.Shutdown += (o, e) =>
{
CreateModel(queueName);
ReadFromQueue(onDequeue, onError, queueName);
};
_model.BasicConsume(queueName, false, consumer);
}
public void AcknowledgeMessage(ulong deliveryTag)
{
if (!IsConnected) Connect();
CreateModel(_settings.QueueName);
_model.BasicAck(deliveryTag, false);
}
public void StopListening()
{
_model.BasicCancel(consumer.ConsumerTag);
}
}主类
static ManualResetEvent _quitEvent = new ManualResetEvent(false);
public static void Main(string[] args)
{
IServiceCollection services = new ServiceCollection();
ConfigureServices(services);
var serviceProvider = services.BuildServiceProvider();
Console.WriteLine($"[{DateTime.UtcNow.ToString("dd/MM/yyyy HH:mm:ss")}] -> Worker role started");
var listener = serviceProvider.GetService<IMessageProcessor>();
Console.CancelKeyPress += (sender, eArgs) =>
{
listener.OnStop();
Console.WriteLine($"[{DateTime.UtcNow.ToString("dd/MM/yyyy HH:mm:ss")}] -> Worker role finished");
_quitEvent.Set();
eArgs.Cancel = true;
};
_quitEvent.WaitOne();
}
private static IConfigurationRoot GetConfiguration()
{
// Build appsetting.json configuration
var environment = Environment.GetEnvironmentVariable("Environment");
return new ConfigurationBuilder()
.SetBasePath(Directory.GetCurrentDirectory())
.AddJsonFile("appsettings.json", optional: true, reloadOnChange: true)
.AddJsonFile($"appsettings.{environment}.json", optional: true)
.AddEnvironmentVariables().Build();
}
private static void ConfigureServices(IServiceCollection services)
{
IConfigurationRoot configuration = GetConfiguration();
services.AddSingleton<IConfigurationRoot>(configuration);
// Support typed options
services.AddOptions();
services.Configure<RabbitMQSettings>(configuration.GetSection("RabbitMQConfig"));
services.AddSingleton<IQueueConsumer, QueueConsumer>();
services.AddScoped<IMessageProcessor, MessageProcessor>();
}
}发布于 2017-02-07 06:49:51
对于您试图绕过的问题,我唯一能想到的就是将其视为事务。只有当您收到消息时,才会完整地处理它,并将Ack发回,认为事务已完成。
如果您首先处理消息,并且有人在Ack之前终止了应用程序,那么它将再次排队,当我们重新启动应用程序时,它将再次得到处理。
相反,如果您先单击Ack,然后尝试处理消息,而有人终止了应用程序,则您将丢失该消息。
因此,将整个过程视为事务将使其正常工作,或者另一种选择是再次处理相同的消息。
https://stackoverflow.com/questions/42073326
复制相似问题