我想知道为什么我的RabbitMQ RPC客户端总是在重启后处理死消息。_channel.QueueDeclare(queue, false, false, false, null);应该禁用缓冲区。如果我在RPC客户机中重载了QueueDeclare,我就无法连接到服务器。这里出什么问题了吗?知道怎么解决这个问题吗?
RPC-服务器
new Thread(() =>
{
var factory = new ConnectionFactory { HostName = _hostname };
if (_port > 0)
factory.Port = _port;
_connection = factory.CreateConnection();
_channel = _connection.CreateModel();
_channel.QueueDeclare(queue, false, false, false, null);
_channel.BasicQos(0, 1, false);
var consumer = new QueueingBasicConsumer(_channel);
_channel.BasicConsume(queue, false, consumer);
IsRunning = true;
while (IsRunning)
{
BasicDeliverEventArgs ea;
try {
ea = consumer.Queue.Dequeue();
}
catch (Exception ex) {
IsRunning = false;
}
var body = ea.Body;
var props = ea.BasicProperties;
var replyProps = _channel.CreateBasicProperties();
replyProps.CorrelationId = props.CorrelationId;
var xmlRequest = Encoding.UTF8.GetString(body);
var messageRequest = XmlSerializer.DeserializeObject(xmlRequest, typeof(Message)) as Message;
var messageResponse = handler(messageRequest);
_channel.BasicPublish("", props.ReplyTo, replyProps,
messageResponse);
_channel.BasicAck(ea.DeliveryTag, false);
}
}).Start();RPC-客户端
public void Start()
{
if (IsRunning)
return;
var factory = new ConnectionFactory {
HostName = _hostname,
Endpoint = _port <= 0 ? new AmqpTcpEndpoint(_endpoint)
: new AmqpTcpEndpoint(_endpoint, _port)
};
_connection = factory.CreateConnection();
_channel = _connection.CreateModel();
_replyQueueName = _channel.QueueDeclare(); // Do not connect any more
_consumer = new QueueingBasicConsumer(_channel);
_channel.BasicConsume(_replyQueueName, true, _consumer);
IsRunning = true;
}
public Message Call(Message message)
{
if (!IsRunning)
throw new Exception("Connection is not open.");
var corrId = Guid.NewGuid().ToString().Replace("-", "");
var props = _channel.CreateBasicProperties();
props.ReplyTo = _replyQueueName;
props.CorrelationId = corrId;
if (!String.IsNullOrEmpty(_application))
props.AppId = _application;
message.InitializeProperties(_hostname, _nodeId, _uniqueId, props);
var messageBytes = Encoding.UTF8.GetBytes(XmlSerializer.ConvertToString(message));
_channel.BasicPublish("", _queue, props, messageBytes);
try
{
while (IsRunning)
{
var ea = _consumer.Queue.Dequeue();
if (ea.BasicProperties.CorrelationId == corrId)
{
var xmlResponse = Encoding.UTF8.GetString(ea.Body);
try
{
return XmlSerializer.DeserializeObject(xmlResponse, typeof(Message)) as Message;
}
catch(Exception ex)
{
IsRunning = false;
return null;
}
}
}
}
catch (EndOfStreamException ex)
{
IsRunning = false;
return null;
}
return null;
}发布于 2015-07-15 10:40:29
尝试在RPC客户端代码中将DeliveryMode属性设置为非持久性(1),如下所示:
public Message Call(Message message)
{
...
var props = _channel.CreateBasicProperties();
props.DeliveryMode = 1; //you might want to do this in your RPC-Server as well
...
}AMQP模型解释包含非常有用的资源,比如解释如何处理最后出现在死信队列中的消息。
文档中关于队列持久性的另一个有用说明:
持久队列被持久化到磁盘,从而幸存下来重新启动代理。不能持久的队列称为临时队列。并不是所有的场景和用例任务队列都是持久的。 队列的持久性不会使路由到该队列的消息持久。如果代理被删除,然后重新启动,则在代理启动期间将重新声明持久队列,但是,只有持久消息才会被恢复。
请注意,它讨论的是broker重启、而不是publisher或使用者重新启动。
https://stackoverflow.com/questions/31369854
复制相似问题