我想要一个场景,有许多消费者监听新消息并处理它们。当消息不为空时,消费者应该始终在监听和处理消息。这是我写的代码:
string msg = null;
try
{
using (var connection = factory.CreateConnection())
using (var channel = connection.CreateModel())
{
queueMessage = queue.getQueue("myQueue");
channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);
consumer = new EventingBasicConsumer(channel);
channel.BasicConsume(queue: "myQueue", noAck: false, consumer: consumer);
while (true)
{
consumer.Received += (model, ea) =>
{
var body = ea.Body;
msg = Encoding.UTF8.GetString(body);
if (msg != null)
{
executeTask(msg);
}
channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
};
channel.Dispose();
connection.Dispose();
consumer.Model.Dispose();
}//while(true)
}
}
catch (Exception e)
{
e.printStackTrace();
}过了一段时间,抛出一个OutOfMemoryException,很可能是因为没有“刷新”机制,接收的属性变满了。一个粗略的解决方案可能是实例化while中的连接、通道和消费者(True):
while (true)
{
using (var connection = factory.CreateConnection())
using (var channel = connection.CreateModel())
{
queueMessage = queue.getQueue("myQueue");
channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);
consumer = new EventingBasicConsumer(channel);
channel.BasicConsume(queue: "myQueue", noAck: false, consumer: consumer);
consumer.Received += (model, ea) =>
{
...我一点也不相信这是一个好的解决方案,因为多个线程是连续打开和关闭的。可能是一个不同的类来处理这种行为,还是我遗漏了什么?
发布于 2020-10-17 01:51:27
首先,整个工厂连接和客户端应该在一个对象中创建,当你的服务/程序的生命周期继续时,你将始终引用这个对象,这样它们就不会被GC'd,特别是不要把它们放在using([])中。想要多个客户端?while分离线程和实例其次,您不需要使用‘->’和通道重新创建,因为一旦您将一些工作注册到consumer.Received,然后初始化channel.BasicConsume,工作调用将保留在内存中,并在每个到达使用者的入站消息上进行调用
https://stackoverflow.com/questions/45563297
复制相似问题