我有一个Spring 应用程序,它使用来自队列的消息(ActiveMQ)并将它们写入数据库(DB2),我需要它是完全事务性的。我意识到事务管理器(使用spring-boot-starter-jta-atomikos)是分布式事务的最佳解决方案,我正试图正确地实现它。
JMS配置类:
@EnableJms
@Configuration
public class MQConfig {
@Bean
public ConnectionFactory connectionFactory() {
RedeliveryPolicy rp = new RedeliveryPolicy();
rp.setMaximumRedeliveries(3);
rp.setRedeliveryDelay(1000L);
ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory();
cf.setBrokerURL("tcp://localhost:61616");
cf.setRedeliveryPolicy(rp);
return cf;
}
@Bean
public JmsTemplate jmsTemplate() {
JmsTemplate template = new JmsTemplate(connectionFactory());
template.setConnectionFactory(connectionFactory());
return template;
}
@Bean
public DefaultJmsListenerContainerFactory jmsListenerContainerFactory() {
DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
factory.setConnectionFactory(connectionFactory());
factory.setCacheLevelName("CACHE_CONSUMER");
factory.setReceiveTimeout(1000L);
factory.setSessionTransacted(true);
return factory;
}
}JMS侦听器类:
@Component
public class MQListener {
@Autowired
private ImportRecordsService importRecordsService;
@JmsListener(
containerFactory = "jmsListenerContainerFactory",
destination = "test.queue"
// concurrency = "4-10"
)
public void receiveMessage(TextMessage message) throws JMSException {
importRecordsService.createRecord();
}
}写入DB的服务类:
@Service
public class ImportRecordsService {
@Autowired
private ImportRecordsDAO dao;
@Transactional
public void createRecord() {
ImportRecord record = new ImportRecord();
record.setDateCreated(LocalDateTime.now());
record.setName("test-001");
dao.save(record);
}
}如果保存后在createRecord()中抛出异常,回滚将按应有的方式工作。保存后,在receiveMessage()中的JMS中抛出异常时,消息将返回到队列,但数据库记录将保留。
任何帮助都非常感谢。
发布于 2022-11-04 23:56:52
这应该和将transactionManager添加到DefaultJmsListenerContainerFactory一样简单。
在本例中,将factory.setTransactionManager(jtaTransactionManager). (应该是可用的Spring )添加到jmsListenerContainerFactory方法签名中,然后调用PlatformTransactionManager
@Bean
public DefaultJmsListenerContainerFactory jmsListenerContainerFactory(ConnectionFactory connectionFactory, PlatformTransactionManager jtaTransactionManager) {
DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
factory.setCacheLevelName("CACHE_CONSUMER");
factory.setReceiveTimeout(1000L);
factory.setTransactionManager(jtaTransactionManager);
factory.setSessionTransacted(true);
return factory;
}尽管您需要注意的是,设置transactionManager会将缓存级别重置为CACHE_NONE。
https://stackoverflow.com/questions/73465803
复制相似问题