首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >Apache ActiveMQ Artemis如何调查消息是否丢失?

Apache ActiveMQ Artemis如何调查消息是否丢失?
EN

Stack Overflow用户
提问于 2021-09-23 15:46:36
回答 1查看 330关注 0票数 0

我使用JMS 2.16.0作为我的代理,使用artemis-jms-client-2.16.0.jar作为我的ActiveMQ客户端。它感觉我正在随机丢失一些消息,因为我不知道的原因。我研究了我的Java代码,没有发现任何异常。

我有一个方法

代码语言:javascript
复制
@JmsListener(destination = "${myQueue}", containerFactory = "jmsListenerContainerFactory")
@Override
public void process(Message message) {
    try {
        processMessage(Message message);
    } catch (Exception ex) {
        LOG.error("Error[...]", ex);
        responseSender.send(otherQueue, message, ex);
    }
}

processMessage(Message message)方法如下所示:

代码语言:javascript
复制
public void processMessage(Message message) {
    try {
        byte[] request = message.getBody(byte[].class);
        [...]
        if (!condition) {
            throw new MyBusinessError("error happened");
        }
        [...]
    } finally {
        MDC.remove(ID);
    } 
}
代码语言:javascript
复制
@Bean(name = "jmsListenerContainerFactoryTest")
@Primary
public DefaultJmsListenerContainerFactory jmsListenerContainerFactory() {
    DefaultJmsListenerContainerFactory factory =
            new DefaultJmsListenerContainerFactory();
    factory.setSessionAcknowledgeMode(Session.CLIENT_ACKNOWLEDGE);
    factory.setSessionTransacted(true);
    factory.setConnectionFactory(cachingConnectionFactory());
    return factory;
}
代码语言:javascript
复制
public class MyBusinessException extends Exception {
    private int code;
    [...]
}

broker.xml

代码语言:javascript
复制
<?xml version='1.0'?>
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements.  See the NOTICE file
distributed with this work for additional information
regarding copyright ownership.  The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License.  You may obtain a copy of the License at

  http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied.  See the License for the
specific language governing permissions and limitations
under the License.
-->

<configuration xmlns="urn:activemq"
               xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
               xmlns:xi="http://www.w3.org/2001/XInclude"
               xsi:schemaLocation="urn:activemq /schema/artemis-configuration.xsd">

   <core xmlns="urn:activemq:core" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="urn:activemq:core ">

      <name>0.0.0.0</name>

      <persistence-enabled>true</persistence-enabled>
      
      <journal-type>NIO</journal-type>

      <paging-directory>data/paging</paging-directory>

      <bindings-directory>data/bindings</bindings-directory>

      <journal-directory>data/journal</journal-directory>

      <large-messages-directory>data/large-messages</large-messages-directory>

      <journal-datasync>true</journal-datasync>

      <journal-min-files>2</journal-min-files>

      <journal-pool-files>10</journal-pool-files>

      <journal-device-block-size>4096</journal-device-block-size>

      <journal-file-size>10M</journal-file-size>
      
      <!--
       This value was determined through a calculation.
       Your system could perform 2,17 writes per millisecond
       on the current journal configuration.
       That translates as a sync write every 490000 nanoseconds.

       Note: If you specify 0 the system will perform writes directly to the disk.
             We recommend this to be 0 if you are using journalType=MAPPED and journal-datasync=false.
      -->
      <journal-buffer-timeout>490000</journal-buffer-timeout>


      <!--
        When using ASYNCIO, this will determine the writing queue depth for libaio.
       -->
      <journal-max-io>1</journal-max-io>

      <!-- how often we are looking for how many bytes are being used on the disk in ms -->
      <disk-scan-period>5000</disk-scan-period>

      <!-- once the disk hits this limit the system will block, or close the connection in certain protocols
           that won't support flow control. -->
      <max-disk-usage>90</max-disk-usage>

      <!-- should the broker detect dead locks and other issues -->
      <critical-analyzer>true</critical-analyzer>

      <critical-analyzer-timeout>120000</critical-analyzer-timeout>

      <critical-analyzer-check-period>60000</critical-analyzer-check-period>

      <critical-analyzer-policy>HALT</critical-analyzer-policy>

      
      <page-sync-timeout>460000</page-sync-timeout>

      <acceptors>
         <!-- Acceptor for every supported protocol -->
         <acceptor name="artemis">tcp://0.0.0.0:61616?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;amqpMinLargeMessageSize=102400;protocols=CORE,AMQP,STOMP,HORNETQ,MQTT,OPENWIRE;useEpoll=true;amqpCredits=1000;amqpLowCredits=300;amqpDuplicateDetection=true</acceptor>

         <!-- AMQP Acceptor.  Listens on default AMQP port for AMQP traffic.-->
         <acceptor name="amqp">tcp://0.0.0.0:5672?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=AMQP;useEpoll=true;amqpCredits=1000;amqpLowCredits=300;amqpMinLargeMessageSize=102400;amqpDuplicateDetection=true</acceptor>

         <!-- STOMP Acceptor. -->
         <acceptor name="stomp">tcp://0.0.0.0:61613?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=STOMP;useEpoll=true</acceptor>

         <!-- HornetQ Compatibility Acceptor.  Enables HornetQ Core and STOMP for legacy HornetQ clients. -->
         <acceptor name="hornetq">tcp://0.0.0.0:5445?anycastPrefix=jms.queue.;multicastPrefix=jms.topic.;protocols=HORNETQ,STOMP;useEpoll=true</acceptor>

         <!-- MQTT Acceptor -->
         <acceptor name="mqtt">tcp://0.0.0.0:1883?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=MQTT;useEpoll=true</acceptor>

      </acceptors>


      <security-settings>
         <security-setting match="#">
            <permission type="createNonDurableQueue" roles="amq"/>
            <permission type="deleteNonDurableQueue" roles="amq"/>
            <permission type="createDurableQueue" roles="amq"/>
            <permission type="deleteDurableQueue" roles="amq"/>
            <permission type="createAddress" roles="amq"/>
            <permission type="deleteAddress" roles="amq"/>
            <permission type="consume" roles="amq"/>
            <permission type="browse" roles="amq"/>
            <permission type="send" roles="amq"/>
            <!-- we need this otherwise ./artemis data imp wouldn't work -->
            <permission type="manage" roles="amq, admin"/>
         </security-setting>
      </security-settings>
      <connection-ttl-override>60000</connection-ttl-override>
      <address-settings>
         <!-- if you define auto-create on certain queues, management has to be auto-create -->
         <address-setting match="activemq.management#">
                     <!-- <config-delete-queues>FORCE</config-delete-queues>
                      <config-delete-addresses>FORCE</config-delete-addresses>-->
            <dead-letter-address>DLQ</dead-letter-address>
            <expiry-address>ExpiryQueue</expiry-address>
            <redelivery-delay>0</redelivery-delay>
            <!-- with -1 only the global-max-size is in use for limiting -->
            <max-size-bytes>-1</max-size-bytes>
            <message-counter-history-day-limit>10</message-counter-history-day-limit>
            <address-full-policy>PAGE</address-full-policy>
            <auto-create-queues>true</auto-create-queues>
            <auto-create-addresses>true</auto-create-addresses>
            <auto-create-jms-queues>true</auto-create-jms-queues>
            <auto-create-jms-topics>true</auto-create-jms-topics>
            <auto-delete-queues>false</auto-delete-queues>
         </address-setting>
         <!--default for catch all-->
         <address-setting match="#">
            <dead-letter-address>DLQ</dead-letter-address>
            <expiry-address>ExpiryQueue</expiry-address>
            <redelivery-delay>0</redelivery-delay>
            <max-size-bytes>-1</max-size-bytes>
            <message-counter-history-day-limit>10</message-counter-history-day-limit>
            <address-full-policy>PAGE</address-full-policy>
            <auto-create-queues>true</auto-create-queues>
            <auto-create-addresses>true</auto-create-addresses>
            <auto-create-jms-queues>true</auto-create-jms-queues>
            <auto-create-jms-topics>true</auto-create-jms-topics>
            <auto-delete-queues>false</auto-delete-queues>
         </address-setting>
      </address-settings>

      <addresses>
        <address name="MyQueue">
            <anycast>
               <queue name="MyQueue">
               </queue>
            </anycast>
         </address>
         <address name="MyOtherQueue">
            <anycast>
               <queue name="MyOtherQueue" />
            </anycast>
         </address>                                                 
         <address name="DLQ">
            <anycast>
               <queue name="DLQ" />
            </anycast>
         </address>
         <address name="ExpiryQueue">
            <anycast>
               <queue name="ExpiryQueue" />
            </anycast>
         </address>

      </addresses>

   </core>
</configuration>

如果抛出MyBusinessError(...),想法是捕捉异常并向myOtherQueue发送完全相同的消息。如果发送该消息失败(即发生异常),则第二次重新发送该消息,以此类推,最多发送10次,然后发送到DLQ。在本质上,这是我大多数时候看到的,但在我的日志中,在随机时刻,我只看到一次尝试重新传递消息,并且没有消息在DLQ中,接收端抱怨没有消息。它觉得这条信息遗漏了。我已经使用Artemis控制台和JmsToolbox用放大镜查看了myOtherQueue,但除了一个空队列,什么也看不到。我在这个队列上没有消费者。

目的不是将失败消息发送到DLQ,而是发送到另一个队列(myOtherQueue)以供以后调查。如果发生无法将消息传递到该队列的情况,则会将消息放在DLQ上。我就是这么想的。

在一天结束的时候,随机地很少的消息丢失,这就是我试图理解的。我应该如何调查Artemis并查看是否发生了任何消息丢失?从哪里开始?使用什么工具?

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2021-09-23 16:37:45

我将首先在每条消息中放置一个属性,以便能够对其进行唯一标识,然后记录该值,以便以后可以将客户端和代理日志关联起来。如果你使用的是JMS,那么你可以使用下面这样的代码:

代码语言:javascript
复制
String uuid = java.util.UUID.randomUUID().toString();
message.setStringProperty("UUID", uuid);
logger.info("Sending message with UUID: " + uuid);

当然,你也会想要在消费者上记录这一点,例如:

代码语言:javascript
复制
Message message = consumer.receive();
String uuid = message.getStringProperty("UUID");
logger.info("Received message with UUID: " + uuid);

在您的代理上,您应该激活audit logging,或者使用LoggingActiveMQServerPlugin

一旦您有了所有的登录,您只需等待,直到您认为您已经丢失了一条消息,然后检查日志,以找到已发送但未收到的消息的ID。一旦您知道了这一点,您就可以查看broker日志,看看broker是否正确地接收到了它,是否将其分发给了消费者等。这将帮助您缩小问题所在的范围。

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/69303170

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档