首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >如何将RabbitMQ消息转换为c#中的对象列表

如何将RabbitMQ消息转换为c#中的对象列表
EN

Stack Overflow用户
提问于 2022-02-16 21:35:18
回答 1查看 1.1K关注 0票数 0

我正在将json消息发布到rabbitmq中的队列中,它可以正常工作。但是,面对一个问题,我想使用发布队列中的所有数据(作为聊天应用程序),我必须使用所有的消息。

例如,队列中有9项,如下所示

代码语言:javascript
复制
{"Sender":123,"Message":"Test Message-1","Group":1}
{"Sender":123,"Message":"Test Message-2","Group":1}
{"Sender":123,"Message":"Test Message-3","Group":1}
{"Sender":123,"Message":"Test Message-4","Group":1}
{"Sender":567,"Message":"Test Message-5","Group":21}
{"Sender":123,"Message":"Test Message-6","Group":1}
{"Sender":456,"Message":"Test Message-7","Group":1}
{"Sender":456,"Message":"Test Message-8","Group":1}
{"Sender":123,"Message":"Test Message-9","Group":1} 

这些所有消息都按我的意愿存储在队列中。但是,当我试图像下面这样使用api调用来收集它们时,它将不能正常工作。有时获取数据,但有时得不到任何数据并加到列表中。因此,是否有任何方法将所有或有限的数据输入到c#中的对象或数组中。因为所有示例都在使用控制台中的消息。我需要做个收藏品。

代码语言:javascript
复制
public IList<string> GetMessageFromQueue(string _key, bool AutoAck = false)
        {
            var _list = new List<string>();
            var factory = new ConnectionFactory() { HostName = "localhost" };
            using (var connection = factory.CreateConnection())
            using (var channel = connection.CreateModel())
            {
                channel.QueueDeclare(queue: _key,
                                     durable: false,
                                     exclusive: false,
                                     autoDelete: false,
                                     arguments: null);

                var response = channel.QueueDeclarePassive(_key);
                var _test= response.MessageCount;
                var _test2 = response.ConsumerCount;

                var consumer = new EventingBasicConsumer(channel);
                consumer.Received += (model, ea) =>
                {
                    var body = ea.Body.ToArray();
                    var message = Encoding.UTF8.GetString(body);
                    _list.Add(message); 
                };
                //if (_list.Count == 0)
                //    AutoAck = false;
                channel.BasicConsume(queue: _key,
                                     autoAck: AutoAck,
                                     consumer: consumer);

            }
            return _list;
        }

和我的控制器

代码语言:javascript
复制
public IActionResult Collect(){
    _queueClient.GetMessageFromQueue("myKey",true);
}

此方法olsa由于BasicConsume的autoack属性而清除队列。我也尝试使用basicAck。

在rabbitmq/c#中向对象数组获取消息的最佳方法是什么?

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2022-02-20 18:16:21

在我看来,您的函数GetMessageFromQueue正在经历设置所有内容的过程,但是立即将从函数中退出,而无需等待Received函数收集所有消息。

例如,这是您为从队列中收集消息而设置的内联函数:

代码语言:javascript
复制
consumer.Received += (model, ea) =>
{
    var body = ea.Body.ToArray();
    var message = Encoding.UTF8.GetString(body);
    _list.Add(message); 
};

然后,...but 2行之后,您只需立即退出该函数,而无需等待接收的函数将所有消息添加到列表中。

代码语言:javascript
复制
// exit function straight away!
return _list;

我注意到在您的示例代码中,您可以获得队列中保存的消息的计数。这很好,因为它意味着您知道需要接收多少条消息。

代码语言:javascript
复制
var _test= response.MessageCount;

因此,您可以尝试做的一件事是添加一个ManualResetEventSlimSemaphoreSlim,在函数的底部等待,直到它发出信号,然后返回(可能有更好的方法来实现这一点,但这正是我现在突然想到的)。

例如,在函数的顶部创建一个ManualResetEventSlim事件

代码语言:javascript
复制
var msgsRecievedGate = new ManualResetEventSlim(false);

然后,在退出函数之前,等待它被设置。

代码语言:javascript
复制
msgsRecievedGate.Wait();

就像这样:

代码语言:javascript
复制
public IList<string> GetMessageFromQueue(string _key, bool AutoAck = false)
{
    var _list = new List<string>();
    
    // Setup synchronization event. 
    var msgsRecievedGate = new ManualResetEventSlim(false);
    
    var factory = new ConnectionFactory() { HostName = "localhost" };
    using (var connection = factory.CreateConnection())
    using (var channel = connection.CreateModel())
    {
        channel.QueueDeclare(queue: _key,
                             durable: false,
                             exclusive: false,
                             autoDelete: false,
                             arguments: null);

        var response = channel.QueueDeclarePassive(_key);
        
        var msgCount = response.MessageCount;
        var msgRecieved = 0;
        
        var consumer = new EventingBasicConsumer(channel);
        consumer.Received += (model, ea) =>
        {
            msgRecieved++;
            
            var body = ea.Body.ToArray();
            var message = Encoding.UTF8.GetString(body);
            _list.Add(message); 
            
            if ( msgRecieved == msgCount )
            {
                // Set signal here
                msgsRecievedGate.Set();
                
                // exit function 
                return;
            }
        };
        
        
        channel.BasicConsume(queue: _key,
                             autoAck: AutoAck,
                             consumer: consumer);

    }
    
    // Wait here until all messages are retrieved
    msgsRecievedGate.Wait();
    
    // now exit function! 
    return _list;
}

请小心。我还没有测试过上面的代码,所以你的里程数是不同的。

票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/71149566

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档