我有两个经纪人托管在不同的物理机器上。我需要编写camel路由,它将从broker-1的入站队列中获取消息&将其发送到broker-2中配置的出站队列。现在,如果在任何情况下经纪人-2下降,那么流量应该被路由到第三个经纪人。由于这将是一个分布式事务,所以我猜XA事务(Springboot Atomikos)需要使用&为了将流量转移到第三代理,需要使用控制总线eip。但是由于我对骆驼并不熟悉,所以不知道该怎么做this..can,有人请你指导我吗?
POM
<dependency>
<groupId>org.apache.camel</groupId>
<artifactId>camel-core</artifactId>
<version>3.7.0</version>
</dependency>
<dependency>
<groupId>org.apache.camel</groupId>
<artifactId>camel-jms</artifactId>
<version>3.7.0</version>
</dependency>
<!-- <dependency>
<groupId>org.apache.camel</groupId>
<artifactId>camel-resilience4j</artifactId>
<version>3.4.5</version>use the same version as your Camel core version
</dependency> -->
<dependency>
<groupId>org.apache.camel.springboot</groupId>
<artifactId>camel-resilience4j-starter</artifactId>
<version>3.7.0</version>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-camel</artifactId>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-broker</artifactId>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-client</artifactId>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-pool</artifactId>
</dependency>
<!-- https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-starter-jta-atomikos -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-jta-atomikos</artifactId>
</dependency>Config
@Configuration
public class JMSConfigManager {
@Bean(name = "activemq1")
public ActiveMQComponent createComponent1(ConnectionFactory factory, JtaTransactionManager jtaTransactionManager) {
ActiveMQComponent activeMQComponent = new ActiveMQComponent();
activeMQComponent.setConnectionFactory(jmsConnectionFactory1());
activeMQComponent.setTransactionManager(jtaTransactionManager);
//activeMQComponent.setLazyCreateTransactionManager(false);
activeMQComponent.setCacheLevelName("CACHE_CONSUMER");
activeMQComponent.setTransacted(false);
activeMQComponent.setDeliveryPersistent(true);
//activeMQComponent.setTransactionName("PROPAGATION_REQUIRED");
activeMQComponent.setAcknowledgementMode(JmsProperties.AcknowledgeMode.CLIENT.getMode());
return activeMQComponent;
}
@Bean(name = "activemq2")
public ActiveMQComponent createComponent2(ConnectionFactory factory, JtaTransactionManager jtaTransactionManager) {
ActiveMQComponent activeMQComponent = new ActiveMQComponent();
activeMQComponent.setConnectionFactory(jmsConnectionFactory2());
//activeMQComponent.setLazyCreateTransactionManager(false);
activeMQComponent.setCacheLevelName("CACHE_CONSUMER");
activeMQComponent.setTransactionManager(jtaTransactionManager);
activeMQComponent.setTransacted(false);
activeMQComponent.setDeliveryPersistent(true);
//activeMQComponent.setTransactionName("PROPAGATION_REQUIRED");
activeMQComponent.setAcknowledgementMode(JmsProperties.AcknowledgeMode.CLIENT.getMode());
return activeMQComponent;
}
@Primary
@Bean(name = "cf1") public ConnectionFactory jmsConnectionFactory1() {
ActiveMQConnectionFactory connectionFactory = new
ActiveMQConnectionFactory();
connectionFactory.setBrokerURL("tcp://localhost:61616");
return connectionFactory;
}
@Bean(name = "cf2") public ConnectionFactory jmsConnectionFactory2() {
ActiveMQConnectionFactory connectionFactory = new
ActiveMQConnectionFactory();
connectionFactory.setBrokerURL("tcp://localhost:61617");
return connectionFactory;
}策略
@Configuration
public class TransactionConfig {
@Bean("policyPropagationRequired")
public SpringTransactionPolicy transactionPolicyPropagationRequired(
@Autowired JtaTransactionManager transactionmanager) {
SpringTransactionPolicy policy = new SpringTransactionPolicy(transactionmanager);
policy.setPropagationBehaviorName("PROPAGATION_REQUIRED");
return policy;
}
}路由
@Override
public void configure() throws Exception {
System.out.println("Test-1");
from("jms:INBOUND.Q?connectionFactory=cf1")
.transacted("policyPropagationRequired")
//.log(LoggingLevel.INFO, log, "******Inbound messages Received")
.to("jms:OUTBOUND.Q1?connectionFactory=cf2")
.end();
}发布于 2021-02-02 16:23:33
,因为这将是一个分布式事务,所以我猜XA事务(Springboot Atomikos)需要使用
这取决于:
如果您想要一个“防水”的代理端到端事务,您需要使用XA-transactions
的本地消费事务。
为了澄清最后一点:如果您使用本地代理事务,Camel在成功处理路由之前不会在使用者上提交消息。因此,如果发生任何错误,将发生回滚并重新传递消息。
其结果是一种边缘情况,可以成功地将消息发送到目标代理,但是Camel不再能够针对源代理提交消息。然后发生重发,路由再处理一次,相同的消息被传递两次(或更多次)。
因此,选择要么使用XA事务,要么对幂等消费者(补偿上述边缘情况)使用使用者事务。
用于将流量转移到第三代理,控制总线eip将被使用
您不能简单地使用Camel错误处理来路由到broker 3而不是broker 2吗?
另一种策略是构建某种代理集群(真正的集群、代理网络或其他什么)来封装从broker 2到broker 3的故障转移。
https://stackoverflow.com/questions/65957657
复制相似问题