首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >Rabbit句柄取消令牌

Rabbit句柄取消令牌
EN

Stack Overflow用户
提问于 2017-02-07 01:03:49
回答 2查看 1.9K关注 0票数 2

我正在关注这篇在控制台应用程序上使用消息的文章(http://www.jarloo.com/listening-to-rabbitmq-events/),目前我关心的是当用户按下CTRL +C并退出应用程序时会发生什么。

作为最低限度,我希望它在退出程序之前完成对当前消息和ack的处理。我对如何实现这段代码感到困惑,因为我是RabbitMQ的新手。

我知道channel.BasicConsume(queueName,true,consumer);是阻塞线程。

任何帮助都将不胜感激。

EN

回答 2

Stack Overflow用户

发布于 2017-02-07 17:11:24

这是我设法实现的,但不确定这是最好的方法还是有改进。我没有使用cancellationToken,因为我还不确定我们是否可以使用它。

在我的控制台上,我得到了预期的结果(见截图)

代码语言:javascript
复制
public abstract class QueueConnection : IDisposable
{
    internal IConnection _connection;
    internal IModel _model;
    internal IBasicProperties _properties;
    internal RabbitMQSettings _settings;
    internal protected object _modelLock = new Object();

    public QueueConnection(IOptions<RabbitMQSettings> queueConfig)
    {
        _settings = queueConfig.Value;
    }

    internal bool CreateModel(string queueName)
    {
        if (string.IsNullOrEmpty(queueName))
        {
            throw new ArgumentException("The queue name has to be specified before.");
        }

        lock (_modelLock)
        {
            if (!IsConnected) Connect();
            if (_model == null || _model.IsClosed)
            {
                _model = _connection.CreateModel();

                // When AutoClose is true, the last channel to close will also cause the connection to close. 
                // If it is set to true before any channel is created, the connection will close then and there.
                _connection.AutoClose = true;

                // Configure the Quality of service for the model. Below is how what each setting means.
                // BasicQos(0="Dont send me a new message untill I’ve finshed",  1= "Send me one message at a time", false ="Apply to this Model only")
                _model.BasicQos(0, 50, false);

                const bool durable = true, queueAutoDelete = false, exclusive = false;

                _model.QueueDeclare(queueName, durable, exclusive, queueAutoDelete, null);
                _properties = RabbitMQProperties.CreateDefaultProperties(_model);
            }
        }

        return true;
    }

    public void Connect()
    {
        var connectionFactory = new ConnectionFactory
        {
            HostName = _settings.HostName,
            UserName = _settings.UserName,
            Password = _settings.Password,
        };


        if (_settings.Port.HasValue) connectionFactory.Port = _settings.Port.Value;
        if (_settings.Heartbeat.HasValue) connectionFactory.RequestedHeartbeat = _settings.Heartbeat.Value;
        if (!string.IsNullOrEmpty(_settings.VirtualHost)) connectionFactory.VirtualHost = _settings.VirtualHost;


        _connection = connectionFactory.CreateConnection();
    }

    public bool IsConnected
    {
        get { return _connection != null && _connection.IsOpen; }
    }

    public object GetConnection()
    {
        return _connection;
    }

    public void Disconnect()
    {
        if (_connection != null) _connection.Dispose();
    }

    void IDisposable.Dispose()
    {
        Disconnect();
    }
}

QueueConsumer类

代码语言:javascript
复制
public class QueueConsumer : QueueConnection, IQueueConsumer
{
    private EventingBasicConsumer consumer;
    public QueueConsumer(IOptions<RabbitMQSettings> queueConfig)
        :base(queueConfig) {}

    public void ReadFromQueue(Action<string, ulong> onDequeue, Action<Exception, ulong> onError)
    {
        ReadFromQueue(onDequeue, onError, _settings.QueueName);
    }

    public void ReadFromQueue(Action<string, ulong> onDequeue, Action<Exception, ulong> onError, string queueName)
    {

        CreateModel(queueName);

        consumer = new EventingBasicConsumer(_model);

        // Receive the messages
        consumer.Received += (o, e) =>
        {
            try
            {
                var queueMessage = Encoding.UTF8.GetString(e.Body);
                onDequeue.Invoke(queueMessage, e.DeliveryTag);
            }
            catch (Exception ex)
            {
                onError.Invoke(ex, e.DeliveryTag);
            }
        };

        // if the consumer shutdown reconnects to rabbitmq and begin reading from the queue again.
        consumer.Shutdown += (o, e) =>
        {
            CreateModel(queueName);
            ReadFromQueue(onDequeue, onError, queueName);
        };

        _model.BasicConsume(queueName, false, consumer);

    }

    public void AcknowledgeMessage(ulong deliveryTag)
    {
        if (!IsConnected) Connect();
        CreateModel(_settings.QueueName);
        _model.BasicAck(deliveryTag, false);
    }

    public void StopListening()
    {
        _model.BasicCancel(consumer.ConsumerTag);
    }
}

主类

代码语言:javascript
复制
    static ManualResetEvent _quitEvent = new ManualResetEvent(false);


    public static void Main(string[] args)
    {

        IServiceCollection services = new ServiceCollection();
        ConfigureServices(services);

        var serviceProvider = services.BuildServiceProvider();

        Console.WriteLine($"[{DateTime.UtcNow.ToString("dd/MM/yyyy HH:mm:ss")}] -> Worker role started");

        var listener = serviceProvider.GetService<IMessageProcessor>();

        Console.CancelKeyPress += (sender, eArgs) =>
        {
            listener.OnStop();

            Console.WriteLine($"[{DateTime.UtcNow.ToString("dd/MM/yyyy HH:mm:ss")}] -> Worker role finished");
            _quitEvent.Set();
            eArgs.Cancel = true;
        };

        _quitEvent.WaitOne();
    }

    private static IConfigurationRoot GetConfiguration()
    {
        // Build appsetting.json configuration
        var environment = Environment.GetEnvironmentVariable("Environment");

        return new ConfigurationBuilder()
            .SetBasePath(Directory.GetCurrentDirectory())
            .AddJsonFile("appsettings.json", optional: true, reloadOnChange: true)
            .AddJsonFile($"appsettings.{environment}.json", optional: true)
            .AddEnvironmentVariables().Build();
    }

    private static void ConfigureServices(IServiceCollection services)
    {
        IConfigurationRoot configuration = GetConfiguration();
        services.AddSingleton<IConfigurationRoot>(configuration);

        // Support typed options
        services.AddOptions();

        services.Configure<RabbitMQSettings>(configuration.GetSection("RabbitMQConfig"));

        services.AddSingleton<IQueueConsumer, QueueConsumer>();
        services.AddScoped<IMessageProcessor, MessageProcessor>();

    }
}
票数 1
EN

Stack Overflow用户

发布于 2017-02-07 06:49:51

对于您试图绕过的问题,我唯一能想到的就是将其视为事务。只有当您收到消息时,才会完整地处理它,并将Ack发回,认为事务已完成。

如果您首先处理消息,并且有人在Ack之前终止了应用程序,那么它将再次排队,当我们重新启动应用程序时,它将再次得到处理。

相反,如果您先单击Ack,然后尝试处理消息,而有人终止了应用程序,则您将丢失该消息。

因此,将整个过程视为事务将使其正常工作,或者另一种选择是再次处理相同的消息。

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/42073326

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档