首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >如何使@JmsListener (使用Spring和Atomikos)进行事务处理?

如何使@JmsListener (使用Spring和Atomikos)进行事务处理?
EN

Stack Overflow用户
提问于 2022-08-23 22:57:57
回答 1查看 203关注 0票数 2

我有一个Spring 应用程序,它使用来自队列的消息(ActiveMQ)并将它们写入数据库(DB2),我需要它是完全事务性的。我意识到事务管理器(使用spring-boot-starter-jta-atomikos)是分布式事务的最佳解决方案,我正试图正确地实现它。

JMS配置类:

代码语言:javascript
复制
@EnableJms
@Configuration
public class MQConfig {

  @Bean
  public ConnectionFactory connectionFactory() {
    RedeliveryPolicy rp = new RedeliveryPolicy();
    rp.setMaximumRedeliveries(3);
    rp.setRedeliveryDelay(1000L);

    ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory();
    cf.setBrokerURL("tcp://localhost:61616");
    cf.setRedeliveryPolicy(rp);
    return cf;
  }

  @Bean
  public JmsTemplate jmsTemplate() {
    JmsTemplate template = new JmsTemplate(connectionFactory());
    template.setConnectionFactory(connectionFactory());
    return template;
  }

  @Bean
  public DefaultJmsListenerContainerFactory jmsListenerContainerFactory() {
    DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
    factory.setConnectionFactory(connectionFactory());
    factory.setCacheLevelName("CACHE_CONSUMER");
    factory.setReceiveTimeout(1000L);
    factory.setSessionTransacted(true);
    return factory;
  }
}

JMS侦听器类:

代码语言:javascript
复制
@Component
public class MQListener {

  @Autowired
  private ImportRecordsService importRecordsService;

  @JmsListener(
        containerFactory = "jmsListenerContainerFactory",
        destination = "test.queue"
        // concurrency = "4-10"
  )
  public void receiveMessage(TextMessage message) throws JMSException {
    importRecordsService.createRecord();
  }
}

写入DB的服务类:

代码语言:javascript
复制
@Service
public class ImportRecordsService {

  @Autowired
  private ImportRecordsDAO dao;

  @Transactional
  public void createRecord() {
    ImportRecord record = new ImportRecord();
    record.setDateCreated(LocalDateTime.now());
    record.setName("test-001");
    dao.save(record);
  }
}

如果保存后在createRecord()中抛出异常,回滚将按应有的方式工作。保存后,在receiveMessage()中的JMS中抛出异常时,消息将返回到队列,但数据库记录将保留。

任何帮助都非常感谢。

EN

回答 1

Stack Overflow用户

发布于 2022-11-04 23:56:52

这应该和将transactionManager添加到DefaultJmsListenerContainerFactory一样简单。

在本例中,将factory.setTransactionManager(jtaTransactionManager). (应该是可用的Spring )添加到jmsListenerContainerFactory方法签名中,然后调用PlatformTransactionManager

代码语言:javascript
复制
  @Bean
  public DefaultJmsListenerContainerFactory jmsListenerContainerFactory(ConnectionFactory connectionFactory, PlatformTransactionManager jtaTransactionManager) {
    DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
    factory.setConnectionFactory(connectionFactory);
    factory.setCacheLevelName("CACHE_CONSUMER");
    factory.setReceiveTimeout(1000L);
    factory.setTransactionManager(jtaTransactionManager);
    factory.setSessionTransacted(true);
    return factory;
  }

尽管您需要注意的是,设置transactionManager会将缓存级别重置为CACHE_NONE。

票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/73465803

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档