首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >带有xa事务的Apache Camel路由、多个代理和控制总线

带有xa事务的Apache Camel路由、多个代理和控制总线
EN

Stack Overflow用户
提问于 2021-01-29 15:52:35
回答 1查看 254关注 0票数 1

我有两个经纪人托管在不同的物理机器上。我需要编写camel路由,它将从broker-1的入站队列中获取消息&将其发送到broker-2中配置的出站队列。现在,如果在任何情况下经纪人-2下降,那么流量应该被路由到第三个经纪人。由于这将是一个分布式事务,所以我猜XA事务(Springboot Atomikos)需要使用&为了将流量转移到第三代理,需要使用控制总线eip。但是由于我对骆驼并不熟悉,所以不知道该怎么做this..can,有人请你指导我吗?

POM

代码语言:javascript
复制
<dependency>
            <groupId>org.apache.camel</groupId>
            <artifactId>camel-core</artifactId>
            <version>3.7.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.camel</groupId>
            <artifactId>camel-jms</artifactId>
            <version>3.7.0</version>
        </dependency>
        <!-- <dependency>
            <groupId>org.apache.camel</groupId>
            <artifactId>camel-resilience4j</artifactId>
            <version>3.4.5</version>use the same version as your Camel core version
        </dependency> -->
        
        <dependency>
            <groupId>org.apache.camel.springboot</groupId>
            <artifactId>camel-resilience4j-starter</artifactId>
            <version>3.7.0</version>
        </dependency>
        
        
        <dependency>
            <groupId>org.apache.activemq</groupId>
            <artifactId>activemq-camel</artifactId>
        </dependency>
        
        <dependency>
            <groupId>org.apache.activemq</groupId>
            <artifactId>activemq-broker</artifactId>
            </dependency>

        <dependency>
            <groupId>org.apache.activemq</groupId>
            <artifactId>activemq-client</artifactId>
            </dependency>
        <dependency>
            <groupId>org.apache.activemq</groupId>
            <artifactId>activemq-pool</artifactId>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-starter-jta-atomikos -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-jta-atomikos</artifactId>
        </dependency>

Config

代码语言:javascript
复制
@Configuration
public class JMSConfigManager {
    @Bean(name = "activemq1")

    public ActiveMQComponent createComponent1(ConnectionFactory factory, JtaTransactionManager jtaTransactionManager) {
        ActiveMQComponent activeMQComponent = new ActiveMQComponent();
        activeMQComponent.setConnectionFactory(jmsConnectionFactory1());
        activeMQComponent.setTransactionManager(jtaTransactionManager);
        //activeMQComponent.setLazyCreateTransactionManager(false);
        activeMQComponent.setCacheLevelName("CACHE_CONSUMER");
        activeMQComponent.setTransacted(false);
        activeMQComponent.setDeliveryPersistent(true);      
        //activeMQComponent.setTransactionName("PROPAGATION_REQUIRED");
        activeMQComponent.setAcknowledgementMode(JmsProperties.AcknowledgeMode.CLIENT.getMode());
        return activeMQComponent;
    }
    
    @Bean(name = "activemq2")

    public ActiveMQComponent createComponent2(ConnectionFactory factory, JtaTransactionManager jtaTransactionManager) {
        ActiveMQComponent activeMQComponent = new ActiveMQComponent();
        activeMQComponent.setConnectionFactory(jmsConnectionFactory2());
        //activeMQComponent.setLazyCreateTransactionManager(false);
        activeMQComponent.setCacheLevelName("CACHE_CONSUMER");
        activeMQComponent.setTransactionManager(jtaTransactionManager);
        activeMQComponent.setTransacted(false);
        activeMQComponent.setDeliveryPersistent(true);
        //activeMQComponent.setTransactionName("PROPAGATION_REQUIRED");
        activeMQComponent.setAcknowledgementMode(JmsProperties.AcknowledgeMode.CLIENT.getMode());
        return activeMQComponent;
    }
    
      @Primary
      
      @Bean(name = "cf1") public ConnectionFactory jmsConnectionFactory1() {
      ActiveMQConnectionFactory connectionFactory = new
      ActiveMQConnectionFactory();
      connectionFactory.setBrokerURL("tcp://localhost:61616"); 
      return  connectionFactory; 
      }
     
    
      @Bean(name = "cf2") public ConnectionFactory jmsConnectionFactory2() {
          ActiveMQConnectionFactory connectionFactory = new
      ActiveMQConnectionFactory();
      connectionFactory.setBrokerURL("tcp://localhost:61617"); 
      return  connectionFactory; 
      }

策略

代码语言:javascript
复制
@Configuration
public class TransactionConfig {

    @Bean("policyPropagationRequired")
    public SpringTransactionPolicy transactionPolicyPropagationRequired(
            @Autowired JtaTransactionManager transactionmanager) {
        
        SpringTransactionPolicy policy = new SpringTransactionPolicy(transactionmanager);
        policy.setPropagationBehaviorName("PROPAGATION_REQUIRED");
        return policy;
    }
}

路由

代码语言:javascript
复制
@Override
    public void configure() throws Exception {
        System.out.println("Test-1");
        
        from("jms:INBOUND.Q?connectionFactory=cf1")
        .transacted("policyPropagationRequired")
        //.log(LoggingLevel.INFO, log, "******Inbound messages Received")
          .to("jms:OUTBOUND.Q1?connectionFactory=cf2") 
          .end();
         
    }
EN

回答 1

Stack Overflow用户

发布于 2021-02-02 16:23:33

,因为这将是一个分布式事务,所以我猜XA事务(Springboot Atomikos)需要使用

这取决于:

如果您想要一个“防水”的代理端到端事务,您需要使用XA-transactions

  • If -您可以使用一个“重复-可能的事务”,您可以简化设置,并且只使用经纪公司

的本地消费事务

为了澄清最后一点:如果您使用本地代理事务,Camel在成功处理路由之前不会在使用者上提交消息。因此,如果发生任何错误,将发生回滚并重新传递消息。

其结果是一种边缘情况,可以成功地将消息发送到目标代理,但是Camel不再能够针对源代理提交消息。然后发生重发,路由再处理一次,相同的消息被传递两次(或更多次)。

因此,选择要么使用XA事务,要么对幂等消费者(补偿上述边缘情况)使用使用者事务。

用于将流量转移到第三代理,控制总线eip将被使用

您不能简单地使用Camel错误处理来路由到broker 3而不是broker 2吗?

另一种策略是构建某种代理集群(真正的集群、代理网络或其他什么)来封装从broker 2到broker 3的故障转移。

票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/65957657

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档