首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >ACTIVEMQ + NMS无法同步接收

ACTIVEMQ + NMS无法同步接收
EN

Stack Overflow用户
提问于 2020-09-22 20:35:58
回答 1查看 90关注 0票数 0

我正在尝试通过一个NMS (C#)消费者使用activeMQ来获取消息,进行一些处理,然后通过HttpClient.PostAsync()将内容发送到webserivce,所有这些都运行在一个windows服务中(通过Topshelf)。

我正在与之通信的下游系统非常敏感,我使用单独的确认,这样我就可以检查响应,并通过确认或触发自定义重试(即不是session.recover)来相应地采取行动。

由于下游系统不可靠,我一直在尝试几种不同的方法来减少我的消费者的吞吐量。我认为我可以通过转换为同步并使用预取来实现这一点,但它似乎没有起作用。

我的理解是,对于异步消费者,预取‘限制’永远不会被命中,但使用同步方法,预取队列只会在消息被确认时被吃掉,这意味着我可以调整侦听器,使其以下游组件可以处理的速率传递消息。

使用一个加载了100条消息的队列,并使用侦听器(即异步)启动我的代码,然后我可以成功地记录100条消息已经通过。当我将其更改为使用consumer.Receive() (或ReceiveNoWait)时,我永远不会收到消息。

以下是我为同步消费者所做的尝试的一个片段,包含了async选项,但已将其注释掉:

代码语言:javascript
复制
    public Worker(LogWriter logger, ServiceConfiguration config, IConnectionFactory connectionFactory, IEndpointClient endpointClient)
    {
        log = logger;
        configuration = config;
        this.endpointClient = endpointClient;

        connection = connectionFactory.CreateConnection();
        connection.RedeliveryPolicy = GetRedeliveryPolicy();
        connection.ExceptionListener += new ExceptionListener(OnException);
        session = connection.CreateSession(AcknowledgementMode.IndividualAcknowledge);
        queue = session.GetQueue(configuration.JmsConfig.SourceQueueName);
        consumer = session.CreateConsumer(queue);

        // Asynchronous
        //consumer.Listener += new MessageListener(OnMessage);

        // Synchronous
        var message = consumer.Receive(TimeSpan.FromSeconds(5));
        while (true)
        {
            if (!Equals(message, null))
            {
                OnMessage(message);
            }
        }
    }

    public void OnMessage(IMessage message)
    {
        log.DebugFormat("Message {count} Received. Attempt:{attempt}", message.Properties.GetInt("count"), message.Properties.GetInt("NMSXDeliveryCount"));
        message.Acknowledge();
    }
EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2020-09-22 21:07:11

我认为你需要在你的connection上调用Start(),例如:

代码语言:javascript
复制
connection.Start();

调用Start()表明您希望消息流动。

同样值得注意的是,除了从OnMessage抛出异常之外,没有办法跳出while(true)循环。

票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/64009863

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档