首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >RabbitMQ持久队列无法工作(RPC、RPC)

RabbitMQ持久队列无法工作(RPC、RPC)
EN

Stack Overflow用户
提问于 2015-07-12 16:13:14
回答 1查看 1.1K关注 0票数 11

我想知道为什么我的RabbitMQ RPC客户端总是在重启后处理死消息。_channel.QueueDeclare(queue, false, false, false, null);应该禁用缓冲区。如果我在RPC客户机中重载了QueueDeclare,我就无法连接到服务器。这里出什么问题了吗?知道怎么解决这个问题吗?

RPC-服务器

代码语言:javascript
复制
new Thread(() =>
{
    var factory = new ConnectionFactory { HostName = _hostname };
    if (_port > 0)
        factory.Port = _port;
    _connection = factory.CreateConnection();
    _channel = _connection.CreateModel();

    _channel.QueueDeclare(queue, false, false, false, null);
    _channel.BasicQos(0, 1, false);
    var consumer = new QueueingBasicConsumer(_channel);
    _channel.BasicConsume(queue, false, consumer);
    IsRunning = true;
    while (IsRunning)
    {
        BasicDeliverEventArgs ea;
        try {
            ea = consumer.Queue.Dequeue();
        }
        catch (Exception ex) {
            IsRunning = false;
        }
        var body = ea.Body;
        var props = ea.BasicProperties;
        var replyProps = _channel.CreateBasicProperties();
        replyProps.CorrelationId = props.CorrelationId;

        var xmlRequest = Encoding.UTF8.GetString(body);

        var messageRequest = XmlSerializer.DeserializeObject(xmlRequest, typeof(Message)) as Message;
        var messageResponse = handler(messageRequest);

        _channel.BasicPublish("", props.ReplyTo, replyProps,
                                messageResponse);
        _channel.BasicAck(ea.DeliveryTag, false);
    }
}).Start();

RPC-客户端

代码语言:javascript
复制
public void Start()
{
    if (IsRunning)
        return;
    var factory = new ConnectionFactory { 
        HostName = _hostname,
        Endpoint = _port <= 0 ? new AmqpTcpEndpoint(_endpoint) 
                              : new AmqpTcpEndpoint(_endpoint, _port)
    };
    _connection = factory.CreateConnection();
    _channel = _connection.CreateModel();
    _replyQueueName = _channel.QueueDeclare(); // Do not connect any more
    _consumer = new QueueingBasicConsumer(_channel);
    _channel.BasicConsume(_replyQueueName, true, _consumer);
    IsRunning = true;
}

public Message Call(Message message)
{
    if (!IsRunning)
        throw new Exception("Connection is not open.");
    var corrId = Guid.NewGuid().ToString().Replace("-", "");
    var props = _channel.CreateBasicProperties();
    props.ReplyTo = _replyQueueName;
    props.CorrelationId = corrId;

    if (!String.IsNullOrEmpty(_application))
        props.AppId = _application;

    message.InitializeProperties(_hostname, _nodeId, _uniqueId, props);

    var messageBytes = Encoding.UTF8.GetBytes(XmlSerializer.ConvertToString(message));
    _channel.BasicPublish("", _queue, props, messageBytes);

    try 
    {
        while (IsRunning)
        {
            var ea = _consumer.Queue.Dequeue();
            if (ea.BasicProperties.CorrelationId == corrId)
            {
                var xmlResponse = Encoding.UTF8.GetString(ea.Body);
                try
                {
                    return XmlSerializer.DeserializeObject(xmlResponse, typeof(Message)) as Message;
                }
                catch(Exception ex)
                {
                    IsRunning = false;
                    return null;
                }
            }
        }
    }
    catch (EndOfStreamException ex)
    {
        IsRunning = false;
        return null;
    }
    return null;
}
EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2015-07-15 10:40:29

尝试在RPC客户端代码中将DeliveryMode属性设置为非持久性(1),如下所示:

代码语言:javascript
复制
public Message Call(Message message)
{
   ...
   var props = _channel.CreateBasicProperties();
   props.DeliveryMode = 1; //you might want to do this in your RPC-Server as well
   ...
}

AMQP模型解释包含非常有用的资源,比如解释如何处理最后出现在死信队列中的消息。

文档中关于队列持久性的另一个有用说明:

持久队列被持久化到磁盘,从而幸存下来重新启动代理。不能持久的队列称为临时队列。并不是所有的场景和用例任务队列都是持久的。 队列的持久性不会使路由到该队列的消息持久。如果代理被删除,然后重新启动,则在代理启动期间将重新声明持久队列,但是,只有持久消息才会被恢复。

请注意,它讨论的是broker重启、而不是publisher或使用者重新启动。

票数 6
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/31369854

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档