首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >C# RabbitMQ - EventingBasicConsumer.Received OutOfMemory

C# RabbitMQ - EventingBasicConsumer.Received OutOfMemory
EN

Stack Overflow用户
提问于 2017-08-08 16:34:48
回答 1查看 872关注 0票数 0

我想要一个场景,有许多消费者监听新消息并处理它们。当消息不为空时,消费者应该始终在监听和处理消息。这是我写的代码:

代码语言:javascript
复制
        string msg = null;

        try
        {
            using (var connection = factory.CreateConnection())
            using (var channel = connection.CreateModel())
            {

                queueMessage = queue.getQueue("myQueue");
                channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);
                consumer = new EventingBasicConsumer(channel);
                channel.BasicConsume(queue: "myQueue", noAck: false, consumer: consumer);

                while (true)
                {


                    consumer.Received += (model, ea) =>
                    {
                        var body = ea.Body;
                        msg = Encoding.UTF8.GetString(body);
                        if (msg != null)
                        {
                            executeTask(msg);
                        }
                        channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);

                    };
                    channel.Dispose();
                    connection.Dispose();
                    consumer.Model.Dispose();
                }//while(true)
            }
        }
        catch (Exception e)
        { 
         e.printStackTrace();
        }

过了一段时间,抛出一个OutOfMemoryException,很可能是因为没有“刷新”机制,接收的属性变满了。一个粗略的解决方案可能是实例化while中的连接、通道和消费者(True):

代码语言:javascript
复制
        while (true)
        {
        using (var connection = factory.CreateConnection())
            using (var channel = connection.CreateModel())
            {

                queueMessage = queue.getQueue("myQueue");
                channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);
                consumer = new EventingBasicConsumer(channel);
                channel.BasicConsume(queue: "myQueue", noAck: false, consumer: consumer);
                consumer.Received += (model, ea) =>
                {
                ...

我一点也不相信这是一个好的解决方案,因为多个线程是连续打开和关闭的。可能是一个不同的类来处理这种行为,还是我遗漏了什么?

EN

回答 1

Stack Overflow用户

发布于 2020-10-17 01:51:27

首先,整个工厂连接和客户端应该在一个对象中创建,当你的服务/程序的生命周期继续时,你将始终引用这个对象,这样它们就不会被GC'd,特别是不要把它们放在using([])中。想要多个客户端?while分离线程和实例其次,您不需要使用‘->’和通道重新创建,因为一旦您将一些工作注册到consumer.Received,然后初始化channel.BasicConsume,工作调用将保留在内存中,并在每个到达使用者的入站消息上进行调用

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/45563297

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档