首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >如何在Mule XA事务中为ActiveMQ启用RedeliveryDelay

如何在Mule XA事务中为ActiveMQ启用RedeliveryDelay
EN

Stack Overflow用户
提问于 2015-07-13 21:43:24
回答 1查看 732关注 0票数 3

我在Mule中有一个非常简单的流,它监听一个失败队列,如果队列中的消息包含字符串“ActiveMQ”,就会抛出一个异常。没有XA事务,一切都会正常工作。消息在5秒后重新发送,下一次在几秒后重新发送。在5次重新传递之后,消息被移动到死信队列。

对于XA事务,消息被立即重新传递5次,然后移动到死信队列。我怎样才能告诉Mule或ActiveMQ等待RedeliveryDelay?

代码语言:javascript
复制
<?xml version="1.0" encoding="UTF-8"?>

<mule xmlns:scripting="http://www.mulesoft.org/schema/mule/scripting"
    xmlns:jbossts="http://www.mulesoft.org/schema/mule/jbossts" xmlns:jms="http://www.mulesoft.org/schema/mule/jms"
    xmlns="http://www.mulesoft.org/schema/mule/core" xmlns:doc="http://www.mulesoft.org/schema/mule/documentation"
    xmlns:spring="http://www.springframework.org/schema/beans" version="CE-3.6.1"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-current.xsd
http://www.mulesoft.org/schema/mule/core http://www.mulesoft.org/schema/mule/core/current/mule.xsd
http://www.mulesoft.org/schema/mule/jms http://www.mulesoft.org/schema/mule/jms/current/mule-jms.xsd
http://www.mulesoft.org/schema/mule/jbossts http://www.mulesoft.org/schema/mule/jbossts/current/mule-jbossts.xsd
http://www.mulesoft.org/schema/mule/scripting http://www.mulesoft.org/schema/mule/scripting/current/mule-scripting.xsd">

    <spring:beans>

        <!-- Redelivery Policy -->
        <spring:bean id="redeliveryPolicy" class="org.apache.activemq.RedeliveryPolicy">
            <spring:property name="maximumRedeliveries" value="5" />
            <spring:property name="initialRedeliveryDelay" value="5000" />
            <spring:property name="redeliveryDelay" value="5000" />
            <spring:property name="useExponentialBackOff" value="true" />
            <spring:property name="backOffMultiplier" value="1.5" />
        </spring:bean>

        <!-- ActiveMQ Connection factory -->
        <spring:bean id="amqXAFactory"
            class="org.apache.activemq.ActiveMQXAConnectionFactory" lazy-init="true"
            name="amqXAFactory">
            <spring:property name="brokerURL"
                value="tcp://localhost:61616" />
            <spring:property name="redeliveryPolicy" ref="redeliveryPolicy" />
        </spring:bean>

    </spring:beans>

    <jbossts:transaction-manager doc:name="JBoss Transaction Manager">
        <property key="com.arjuna.ats.arjuna.coordinator.defaultTimeout"
            value="47" /><!-- this is in seconds -->
        <property key="com.arjuna.ats.arjuna.coordinator.txReaperTimeout"
            value="108000" /><!-- this is in milliseconds -->
    </jbossts:transaction-manager>

    <jms:activemq-connector name="Active_MQconnectorMessages"
        specification="1.1" validateConnections="true" doc:name="Active MQ for XA"
        persistentDelivery="true" connectionFactory-ref="amqXAFactory"
        numberOfConsumers="1" maxRedelivery="5">
        <reconnect-forever frequency="10000" />
    </jms:activemq-connector>

    <flow name="xatestFlow">
        <jms:inbound-endpoint queue="xa.test" doc:name="JMS"
            connector-ref="Active_MQconnectorMessages">
            <xa-transaction action="ALWAYS_BEGIN" />
        </jms:inbound-endpoint>
        <logger message="Message from queue: #[payload]" level="INFO"
            doc:name="Logger" />
        <scripting:component doc:name="fail if payload contains fail">
            <scripting:script engine="Groovy"><![CDATA[if (payload.contains("fail")) {
  throw new IllegalArgumentException("Failed...")
}
]]></scripting:script>
        </scripting:component>
        <logger message="Message is ready" level="INFO" doc:name="Logger" />
    </flow>
</mule>
EN

回答 1

Stack Overflow用户

发布于 2015-09-21 19:17:28

虽然花了一些时间,但我还是找到了解决方案。有两种类型的配置,ActiveMQ中的服务器端和客户端。

在客户端redeliveryPolicy中,我将maximumRedeliveries更改为0。在服务器上,我在activemq.xml中创建了一个redeliveryPlugin:

代码语言:javascript
复制
    <broker ....>
    ......
    <plugins>
        <redeliveryPlugin fallbackToDeadLetter="true" sendToDlqIfMaxRetriesExceeded="true">
            <redeliveryPolicyMap>
                <redeliveryPolicyMap>
                    <redeliveryPolicyEntries>
                        <!-- a destination specific policy -->
                        <redeliveryPolicy 
                            queue="amm.input.to.router" maximumRedeliveries="3" 
                            redeliveryDelay="7000"
                            initialRedeliveryDelay="5000"
                            useCollisionAvoidance="true"
                            />
                    </redeliveryPolicyEntries>

                    <!-- the fallback policy for all other destinations -->
                    <defaultEntry>
                        <redeliveryPolicy
                            maximumRedeliveries="4"
                            useExponentialBackOff="true"
                            initialRedeliveryDelay="10000"
                            redeliveryDelay="5000"
                            useCollisionAvoidance="true"
                            backOffMultiplier="1.5"
                            maximumRedeliveryDelay="93600000" />
                    </defaultEntry>
                </redeliveryPolicyMap>
            </redeliveryPolicyMap>
        </redeliveryPlugin>
    </plugins>

    </broker>

现在,负责重新传输的是ActiveMQ服务器,而不是客户端。

请参阅http://activemq.apache.org/message-redelivery-and-dlq-handling.html

票数 3
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/31385056

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档