我正在将json消息发布到rabbitmq中的队列中,它可以正常工作。但是,面对一个问题,我想使用发布队列中的所有数据(作为聊天应用程序),我必须使用所有的消息。
例如,队列中有9项,如下所示
{"Sender":123,"Message":"Test Message-1","Group":1}
{"Sender":123,"Message":"Test Message-2","Group":1}
{"Sender":123,"Message":"Test Message-3","Group":1}
{"Sender":123,"Message":"Test Message-4","Group":1}
{"Sender":567,"Message":"Test Message-5","Group":21}
{"Sender":123,"Message":"Test Message-6","Group":1}
{"Sender":456,"Message":"Test Message-7","Group":1}
{"Sender":456,"Message":"Test Message-8","Group":1}
{"Sender":123,"Message":"Test Message-9","Group":1} 这些所有消息都按我的意愿存储在队列中。但是,当我试图像下面这样使用api调用来收集它们时,它将不能正常工作。有时获取数据,但有时得不到任何数据并加到列表中。因此,是否有任何方法将所有或有限的数据输入到c#中的对象或数组中。因为所有示例都在使用控制台中的消息。我需要做个收藏品。
public IList<string> GetMessageFromQueue(string _key, bool AutoAck = false)
{
var _list = new List<string>();
var factory = new ConnectionFactory() { HostName = "localhost" };
using (var connection = factory.CreateConnection())
using (var channel = connection.CreateModel())
{
channel.QueueDeclare(queue: _key,
durable: false,
exclusive: false,
autoDelete: false,
arguments: null);
var response = channel.QueueDeclarePassive(_key);
var _test= response.MessageCount;
var _test2 = response.ConsumerCount;
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
var body = ea.Body.ToArray();
var message = Encoding.UTF8.GetString(body);
_list.Add(message);
};
//if (_list.Count == 0)
// AutoAck = false;
channel.BasicConsume(queue: _key,
autoAck: AutoAck,
consumer: consumer);
}
return _list;
}和我的控制器
public IActionResult Collect(){
_queueClient.GetMessageFromQueue("myKey",true);
}此方法olsa由于BasicConsume的autoack属性而清除队列。我也尝试使用basicAck。
在rabbitmq/c#中向对象数组获取消息的最佳方法是什么?
发布于 2022-02-20 18:16:21
在我看来,您的函数GetMessageFromQueue正在经历设置所有内容的过程,但是立即将从函数中退出,而无需等待Received函数收集所有消息。
例如,这是您为从队列中收集消息而设置的内联函数:
consumer.Received += (model, ea) =>
{
var body = ea.Body.ToArray();
var message = Encoding.UTF8.GetString(body);
_list.Add(message);
};然后,...but 2行之后,您只需立即退出该函数,而无需等待接收的函数将所有消息添加到列表中。
// exit function straight away!
return _list;我注意到在您的示例代码中,您可以获得队列中保存的消息的计数。这很好,因为它意味着您知道需要接收多少条消息。
var _test= response.MessageCount;因此,您可以尝试做的一件事是添加一个ManualResetEventSlim或SemaphoreSlim,在函数的底部等待,直到它发出信号,然后返回(可能有更好的方法来实现这一点,但这正是我现在突然想到的)。
例如,在函数的顶部创建一个ManualResetEventSlim事件
var msgsRecievedGate = new ManualResetEventSlim(false);然后,在退出函数之前,等待它被设置。
msgsRecievedGate.Wait();就像这样:
public IList<string> GetMessageFromQueue(string _key, bool AutoAck = false)
{
var _list = new List<string>();
// Setup synchronization event.
var msgsRecievedGate = new ManualResetEventSlim(false);
var factory = new ConnectionFactory() { HostName = "localhost" };
using (var connection = factory.CreateConnection())
using (var channel = connection.CreateModel())
{
channel.QueueDeclare(queue: _key,
durable: false,
exclusive: false,
autoDelete: false,
arguments: null);
var response = channel.QueueDeclarePassive(_key);
var msgCount = response.MessageCount;
var msgRecieved = 0;
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
msgRecieved++;
var body = ea.Body.ToArray();
var message = Encoding.UTF8.GetString(body);
_list.Add(message);
if ( msgRecieved == msgCount )
{
// Set signal here
msgsRecievedGate.Set();
// exit function
return;
}
};
channel.BasicConsume(queue: _key,
autoAck: AutoAck,
consumer: consumer);
}
// Wait here until all messages are retrieved
msgsRecievedGate.Wait();
// now exit function!
return _list;
}请小心。我还没有测试过上面的代码,所以你的里程数是不同的。
https://stackoverflow.com/questions/71149566
复制相似问题