首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >Spring-amqp -消息处理延迟

Spring-amqp -消息处理延迟
EN

Stack Overflow用户
提问于 2018-06-06 11:31:23
回答 1查看 876关注 0票数 1

我们在RHEL7.0VM上部署了一个Java/spring/tomcat应用程序,它使用AlejandroRivera/embedded,它在war部署后立即启动Rabbitmq服务器,并与其连接。我们有多个队列,用于处理和过滤事件。

流程是这样的:

事件,我们收到->发布事件队列,->侦听器类过滤事件->发布到另一个队列以处理->,我们发布到另一个队列进行日志记录。

问题是:

  • 处理正常启动,我们可以看到消息在队列中流动,但经过一段时间,侦听器类停止接收事件。我们似乎能够将其发布到RabbitMQ频道,但它从未从队列中向侦听器发出。这似乎开始使事件在一段时间后被处理,一直持续到几分钟。负荷不是那么高,大约有200个事件,我们只关心其中的少数几个。

我们试过的是:

  • 最初,队列将预取设置为1,消费者为最小为2,最大为5,我们删除了预取,并添加了更多的使用者作为最大并发设置,但问题仍然存在,延迟只需要更长的时间,但几分钟后,处理开始大约需要20/30秒。

我们在日志中看到,我们将事件发布到队列中,我们看到日志显示,我们延迟将事件从队列中删除。所以在我们中间的代码中没有任何东西可以产生延迟。

据我们所知,其余的队列似乎正确地处理了消息,但正是这个队列陷入了这种停滞模式。

我看到的错误如下,但我使用它的含义,如果它是相关的:

代码语言:javascript
复制
Jun  4 11:16:04  server: [pool-3-thread-10] ERROR com.rabbitmq.client.impl.ForgivingExceptionHandler - Consumer org.springframework.amqp.rabbit.listener.BlockingQueueConsumer$InternalConsumer@70dfa413 (amq.ctag-VaWc-hv-VwcUPh9mTQTj7A) method handleDelivery for channel AMQChannel(amqp://agent@127.0.0.1:5672/,198) threw an exception for channel AMQChannel(amqp://agent@127.0.0.1:5672/,198)
Jun  4 11:16:04  server: java.io.IOException: Unknown consumerTag
Jun  4 11:16:04  server: at com.rabbitmq.client.impl.ChannelN.basicCancel(ChannelN.java:1266)
Jun  4 11:16:04  server: at sun.reflect.GeneratedMethodAccessor180.invoke(Unknown Source)
Jun  4 11:16:04  server: at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
Jun  4 11:16:04  server: at java.lang.reflect.Method.invoke(Method.java:498)
Jun  4 11:16:04  server: at org.springframework.amqp.rabbit.connection.CachingConnectionFactory$CachedChannelInvocationHandler.invoke(CachingConnectionFactory.java:955)
Jun  4 11:16:04  server: at com.sun.proxy.$Proxy119.basicCancel(Unknown Source)
Jun  4 11:16:04  server: at org.springframework.amqp.rabbit.listener.BlockingQueueConsumer$InternalConsumer.handleDelivery(BlockingQueueConsumer.java:846)
Jun  4 11:16:04  server: at com.rabbitmq.client.impl.ConsumerDispatcher$5.run(ConsumerDispatcher.java:149)
Jun  4 11:16:04  server: at com.rabbitmq.client.impl.ConsumerWorkService$WorkPoolRunnable.run(ConsumerWorkService.java:100)
Jun  4 11:16:04  server: at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
Jun  4 11:16:04  server: at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
Jun  4 11:16:04  server: at java.lang.Thread.run(Thread.java:748)

这种情况发生在应用程序关闭时,但我看到它发生在应用程序仍在运行时。

代码语言:javascript
复制
2018-06-05 13:22:45,443 ERROR CachingConnectionFactory$DefaultChannelCloseLogger - Channel shutdown: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - unknown delivery tag 109, class-id=60, method-id=120)

我不知道如何解决这两个错误,也不知道它们是否相关。

这里是我的Spring配置:

代码语言:javascript
复制
<!-- Queues -->
<rabbit:queue id="monitorIncomingEventsQueue" name="MonitorIncomingEventsQueue"/>
<rabbit:queue id="interestingEventsQueue" name="InterestingEventsQueue"/>
<rabbit:queue id="textCallsEventsQueue" name="TextCallsEventsQueue"/>
<rabbit:queue id="callDisconnectedEventQueue" name="CallDisconnectedEventQueue"/>
<rabbit:queue id="incomingCallEventQueue" name="IncomingCallEventQueue"/>
<rabbit:queue id="eventLoggingQueue" name="EventLoggingQueue"/>

<!-- listeners -->
<bean id="monitorListener" class="com.example.rabbitmq.listeners.monitorListener"/>
<bean id="interestingEventsListener" class="com.example.rabbitmq.listeners.InterestingEventsListener"/>
<bean id="textCallsEventListener" class="com.example.rabbitmq.listeners.TextCallsEventListener"/>
<bean id="callDisconnectedEventListener" class="com.example.rabbitmq.listeners.CallDisconnectedEventListener"/>
<bean id="incomingCallEventListener" class="com.example.rabbitmq.listeners.IncomingCallEventListener"/>
<bean id="eventLoggingEventListener" class="com.example.rabbitmq.listeners.EventLoggingListener"/>

<rabbit:listener-container connection-factory="connectionFactory" message-converter="defaultMessageConverter" concurrency="5" max-concurrency="40" acknowledge="none">
    <rabbit:listener queues="interestingEventsQueue" ref="interestingEventsListener" method="handleIncomingMessage"/>
</rabbit:listener-container>

<rabbit:listener-container connection-factory="connectionFactory" message-converter="defaultMessageConverter" concurrency="5" max-concurrency="20" acknowledge="none">
    <rabbit:listener queues="textCallsEventsQueue" ref="textCallsEventListener" method="handleIncomingMessage"/>
</rabbit:listener-container>

<rabbit:listener-container connection-factory="connectionFactory" message-converter="defaultMessageConverter" concurrency="5" max-concurrency="20" acknowledge="none">
    <rabbit:listener queues="callDisconnectedEventQueue" ref="callDisconnectedEventListener" method="handleIncomingMessage"/>
</rabbit:listener-container>

<rabbit:listener-container connection-factory="connectionFactory" message-converter="defaultMessageConverter" concurrency="5" max-concurrency="30" acknowledge="none">
    <rabbit:listener queues="incomingCallEventQueue" ref="incomingCallEventListener" method="handleIncomingMessage"/>
</rabbit:listener-container>

<rabbit:listener-container connection-factory="connectionFactory" message-converter="defaultMessageConverter" concurrency="1" max-concurrency="3" acknowledge="none">
    <rabbit:listener queues="monitorIncomingEventsQueue" ref="monitorListener" method="handleIncomingMessage"/>
</rabbit:listener-container>

<rabbit:listener-container connection-factory="connectionFactory" message-converter="defaultMessageConverter" concurrency="5" max-concurrency="10"  acknowledge="none">
    <rabbit:listener queues="EventLoggingQueue" ref="eventLoggingEventListener" method="handleLoggingEvent"/>
</rabbit:listener-container>

<rabbit:connection-factory id="connectionFactory" host="${host.name}" port="${port.number}" username="${user.name}" password="${user.password}" connection-timeout="20000"/>

我在这里读到,延迟处理可能是网络问题造成的,但在这种情况下,服务器和应用程序在同一个VM上。这是一个被锁定的环境,所以大多数端口都是不开放的,但我怀疑这是怎么回事。

更多日志:https://pastebin.com/4QMFDT7A

任何帮助都是感激的,

谢谢,

EN

回答 1

Stack Overflow用户

发布于 2018-06-06 13:34:20

我需要看到更多的日志--这是一把冒烟的枪:

代码语言:javascript
复制
Storing...Storing delivery for Consumer@a2ce092: tags=[{}]

(使用者) tags是空的,这意味着消费者当时已经被取消了(由于某种原因,这应该出现在日志的前面)。

如果您有可能使用1.7.9复制could快照,那么我添加了一些跟踪级别的日志记录,这将有助于诊断这一点。

编辑

回复你最近对狂犬病病毒使用者的评论.

您能尝试使用固定并发吗?Spring的容器中的变量并发性通常不是很有用,因为只有当整个容器空闲一段时间时,消费者通常才会减少。

然而,这或许可以解释为什么你看到消费者被取消。

也许在这个逻辑中存在一些竞争条件;使用固定数量的使用者(不要指定max.)会避免这种情况;如果你可以尝试,它至少会消除这种可能性。

尽管如此,我还是很困惑(在您的堆栈溢出配置中没有注意到这一点);对于acknowledge="none",不应该向代理发送ack(没有一个用于设置autoAck)。

代码语言:javascript
复制
String consumerTag = this.channel.basicConsume(queue, this.acknowledgeMode.isAutoAck(), ...

代码语言:javascript
复制
public boolean isAutoAck() {

    return this == NONE;

}

你在用你的代码发送密码吗?如果是这样,则ack模式应该是手动的。我看不出容器将发送ack为无ack模式的场景。

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/50719362

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档