首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >如何在同一事务下,从不同的线程调用多个DB调用?

如何在同一事务下,从不同的线程调用多个DB调用?
EN

Stack Overflow用户
提问于 2019-01-17 11:25:12
回答 1查看 885关注 0票数 0

我需要执行干净的insert (delete + insert),每个请求有大量的记录(接近100K)。出于测试目的,我使用10K测试我的代码。对于10K,操作运行了30秒,这是不可接受的。我正在执行spring-data-JPA提供的某种级别的批量插入。然而,结果并不令人满意。

我的代码如下所示

代码语言:javascript
复制
@Transactional 
  public void saveAll(HttpServletRequest httpRequest){  
  List<Person> persons = new ArrayList<>();
  try(ServletInputStream sis = httpRequest.getInputStream()){

         deletePersons(); //deletes all persons based on some criteria
         while((Person p = nextPerson(sis)) != null){

                 persons.add(p);
                 if(persons.size() % 2000 == 0){
                        savePersons(persons); //uses Spring repository to perform saveAll() and flush()
                        persons.clear();
                 }
         }
          savePersons(persons); //uses Spring repository to perform saveAll() and flush()
          persons.clear();
  }
}

@Transactional
public void savePersons(List<Persons> persons){

     System.out.println(new Date()+" Before save");
     repository.saveAll(persons);
     repository.flush();
     System.out.println(new Date()+" After save");
}

我还设置了以下属性

代码语言:javascript
复制
spring.jpa.properties.hibernate.jdbc.batch_size=40
spring.jpa.properties.hibernate.order_inserts=true
spring.jpa.properties.hibernate.order_updates=true
spring.jpa.properties.hibernate.jdbc.batch_versioned_data=true
spring.jpa.properties.hibernate.id.new_generator_mappings=false

查看日志,我注意到insert操作大约需要3-4秒来保存2000条记录,但迭代时间不是很长。因此,我相信读完整个流所花费的时间不是一个瓶颈。但是插入物是。我还检查了日志,确认Spring正在根据属性集批量执行40次插入。

我正在尝试看看,如果有办法,我可以通过使用多个线程(比如2个线程)来提高性能,这些线程可以从阻塞队列中读取数据,一旦累积了2000条记录,就会调用save。我希望,在理论上,这可能会提供更好的结果。但问题是,据我所知,Spring在线程级别管理事务,而事务不能跨线程传播。但是我需要整个操作(delete + insert)都是原子的。我查看了几篇关于Spring事务管理的文章,但没有找到正确的方向。

有没有办法使用Spring事务来实现这种并行性?如果Spring transactions不是答案,还有没有其他可以使用的技术呢?

谢谢

EN

回答 1

Stack Overflow用户

发布于 2019-03-06 08:02:23

不确定这是否会对你有帮助-它在测试应用程序中运行良好。此外,不知道它是否会得到Spring高级人员的“好感”,但我希望学习,所以我发布了这个建议。

在Spring Boot测试应用中,以下代码将JPA存储库注入到ApplicationRunner中,然后将其注入到由ExecutorService管理的Runnables中。每个Runnable都获得一个由单独的KafkaConsumer持续填充的BlockingQueue (它充当队列的生产者)。Runnables使用queue.takes()从队列中弹出,后面跟着一个repo.save()。(可以很容易地将批量插入添加到线程,但由于应用程序还没有需要它,所以还没有这样做...)

这个测试应用程序目前为Postgres (或Timescale) DB实现了JPA,并运行了10个线程,其中10个队列由10个使用者提供。

JPA存储库由

代码语言:javascript
复制
public interface DataRepository extends JpaRepository<DataRecord, Long> {
}

Spring Boot主程序是

代码语言:javascript
复制
@SpringBootApplication
@EntityScan(basePackages = "com.xyz.model")
public class DataApplication {

    private final String[] topics = { "x0", "x1", "x2", "x3", "x4", "x5","x6", "x7", "x8","x9" };
    ExecutorService executor = Executors.newFixedThreadPool(topics.length);


    public static void main(String[] args) {
        SpringApplication.run(DataApplication.class, args);
    }

    @Bean
    ApplicationRunner init(DataRepository dataRepository) {
        return args -> {

            for (String topic : topics) {

                BlockingQueue<DataRecord> queue = new ArrayBlockingQueue<>(1024);
                JKafkaConsumer consumer = new JKafkaConsumer(topic, queue);
                consumer.start();

                JMessageConsumer messageConsumer = new JMessageConsumer(dataRepository, queue);
                executor.submit(messageConsumer);
            }
            executor.shutdown();
        };
    }
}

而Consumer Runnable有一个构造函数和run()方法,如下所示:

代码语言:javascript
复制
public JMessageConsumer(DataRepository dataRepository, BlockingQueue<DataRecord> queue) {
    this.queue = queue;
    this.dataRepository = dataRepository;
}

@Override
public void run() {
    running.set(true);
    while (running.get()) {
        // remove record from FIFO blocking queue
        DataRecord dataRecord;
        try {
            dataRecord = queue.take();
        } catch (InterruptedException e) {
            logger.error("queue exception: " + e.getMessage());
            continue;
        }
        // write to database 
        dataRepository.save(dataRecord);
    }
}

学习,所以任何想法/关注/反馈都是值得感谢的。

票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/54228571

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档