我正在尝试通过一个NMS (C#)消费者使用activeMQ来获取消息,进行一些处理,然后通过HttpClient.PostAsync()将内容发送到webserivce,所有这些都运行在一个windows服务中(通过Topshelf)。
我正在与之通信的下游系统非常敏感,我使用单独的确认,这样我就可以检查响应,并通过确认或触发自定义重试(即不是session.recover)来相应地采取行动。
由于下游系统不可靠,我一直在尝试几种不同的方法来减少我的消费者的吞吐量。我认为我可以通过转换为同步并使用预取来实现这一点,但它似乎没有起作用。
我的理解是,对于异步消费者,预取‘限制’永远不会被命中,但使用同步方法,预取队列只会在消息被确认时被吃掉,这意味着我可以调整侦听器,使其以下游组件可以处理的速率传递消息。
使用一个加载了100条消息的队列,并使用侦听器(即异步)启动我的代码,然后我可以成功地记录100条消息已经通过。当我将其更改为使用consumer.Receive() (或ReceiveNoWait)时,我永远不会收到消息。
以下是我为同步消费者所做的尝试的一个片段,包含了async选项,但已将其注释掉:
public Worker(LogWriter logger, ServiceConfiguration config, IConnectionFactory connectionFactory, IEndpointClient endpointClient)
{
log = logger;
configuration = config;
this.endpointClient = endpointClient;
connection = connectionFactory.CreateConnection();
connection.RedeliveryPolicy = GetRedeliveryPolicy();
connection.ExceptionListener += new ExceptionListener(OnException);
session = connection.CreateSession(AcknowledgementMode.IndividualAcknowledge);
queue = session.GetQueue(configuration.JmsConfig.SourceQueueName);
consumer = session.CreateConsumer(queue);
// Asynchronous
//consumer.Listener += new MessageListener(OnMessage);
// Synchronous
var message = consumer.Receive(TimeSpan.FromSeconds(5));
while (true)
{
if (!Equals(message, null))
{
OnMessage(message);
}
}
}
public void OnMessage(IMessage message)
{
log.DebugFormat("Message {count} Received. Attempt:{attempt}", message.Properties.GetInt("count"), message.Properties.GetInt("NMSXDeliveryCount"));
message.Acknowledge();
}发布于 2020-09-22 21:07:11
我认为你需要在你的connection上调用Start(),例如:
connection.Start();调用Start()表明您希望消息流动。
同样值得注意的是,除了从OnMessage抛出异常之外,没有办法跳出while(true)循环。
https://stackoverflow.com/questions/64009863
复制相似问题