我正在处理一个使用rabbitmq队列的工作人员服务,并且我试图弄清楚如何处理通道关闭事件,例如:假设我的消费者在30分钟内没有对代理进行攻击,而代理为此关闭了通道。
我知道rabbitmq库(我正在使用C#库)将自动尝试在连接关闭时重新连接,但是当连接处于活动状态但通道被关闭时,最佳做法是什么?我可以为‘通道关闭’事件注册处理程序,但是除了记录之外,在这个处理程序中我应该做什么呢?毕竟,我想继续从相关的队列中消费。
下面是我的代码,我试图再次创建通道,但是我得到了超时异常:
var consumer = ...
channel.BasicConsume(queue: queueName, autoAck: false, consumer: consumer);
channel.BasicQos(0, 100, false);
channel.ModelShutdown += (sender, args) =>
{
try
{
Log.Error($"channel was shut down");
channel = _connection.CreateModel();
channel.BasicConsume(queue: queueName, autoAck: false, consumer: consumer);
channel.BasicQos(0, 100, false)
}
catch (Exception exception)
{
Log.Error(exception);
}发布于 2022-05-19 09:47:41
据我所知,这个问题与事件处理发生在试图访问RabbitMQ服务器的同一个线程中有关,或者存在一个锁阻止了通道的创建。我找到的最简单的解决方案就是只使用Task.Run(() => )
private async void RecreateChannel(object sender, ShutdownEventArgs e)
{
_logger.LogWarning($"Channel was shutdowned, whit reason: {e.ReplyText}, code: {e.ReplyCode}, trying to reconnect");
_channel.Dispose();
while (!_channel.IsOpen)
{
try
{
await Task.Run(() => _channel = InitChannel());
}
catch (Exception exception)
{
_logger.LogError(exception, "Failed to recreate channel, trying again");
}
}
_logger.LogInformation("Channel was recreated successfully");
}https://stackoverflow.com/questions/71829896
复制相似问题