我需要执行干净的insert (delete + insert),每个请求有大量的记录(接近100K)。出于测试目的,我使用10K测试我的代码。对于10K,操作运行了30秒,这是不可接受的。我正在执行spring-data-JPA提供的某种级别的批量插入。然而,结果并不令人满意。
我的代码如下所示
@Transactional
public void saveAll(HttpServletRequest httpRequest){
List<Person> persons = new ArrayList<>();
try(ServletInputStream sis = httpRequest.getInputStream()){
deletePersons(); //deletes all persons based on some criteria
while((Person p = nextPerson(sis)) != null){
persons.add(p);
if(persons.size() % 2000 == 0){
savePersons(persons); //uses Spring repository to perform saveAll() and flush()
persons.clear();
}
}
savePersons(persons); //uses Spring repository to perform saveAll() and flush()
persons.clear();
}
}
@Transactional
public void savePersons(List<Persons> persons){
System.out.println(new Date()+" Before save");
repository.saveAll(persons);
repository.flush();
System.out.println(new Date()+" After save");
}我还设置了以下属性
spring.jpa.properties.hibernate.jdbc.batch_size=40
spring.jpa.properties.hibernate.order_inserts=true
spring.jpa.properties.hibernate.order_updates=true
spring.jpa.properties.hibernate.jdbc.batch_versioned_data=true
spring.jpa.properties.hibernate.id.new_generator_mappings=false查看日志,我注意到insert操作大约需要3-4秒来保存2000条记录,但迭代时间不是很长。因此,我相信读完整个流所花费的时间不是一个瓶颈。但是插入物是。我还检查了日志,确认Spring正在根据属性集批量执行40次插入。
我正在尝试看看,如果有办法,我可以通过使用多个线程(比如2个线程)来提高性能,这些线程可以从阻塞队列中读取数据,一旦累积了2000条记录,就会调用save。我希望,在理论上,这可能会提供更好的结果。但问题是,据我所知,Spring在线程级别管理事务,而事务不能跨线程传播。但是我需要整个操作(delete + insert)都是原子的。我查看了几篇关于Spring事务管理的文章,但没有找到正确的方向。
有没有办法使用Spring事务来实现这种并行性?如果Spring transactions不是答案,还有没有其他可以使用的技术呢?
谢谢
发布于 2019-03-06 08:02:23
不确定这是否会对你有帮助-它在测试应用程序中运行良好。此外,不知道它是否会得到Spring高级人员的“好感”,但我希望学习,所以我发布了这个建议。
在Spring Boot测试应用中,以下代码将JPA存储库注入到ApplicationRunner中,然后将其注入到由ExecutorService管理的Runnables中。每个Runnable都获得一个由单独的KafkaConsumer持续填充的BlockingQueue (它充当队列的生产者)。Runnables使用queue.takes()从队列中弹出,后面跟着一个repo.save()。(可以很容易地将批量插入添加到线程,但由于应用程序还没有需要它,所以还没有这样做...)
这个测试应用程序目前为Postgres (或Timescale) DB实现了JPA,并运行了10个线程,其中10个队列由10个使用者提供。
JPA存储库由
public interface DataRepository extends JpaRepository<DataRecord, Long> {
}Spring Boot主程序是
@SpringBootApplication
@EntityScan(basePackages = "com.xyz.model")
public class DataApplication {
private final String[] topics = { "x0", "x1", "x2", "x3", "x4", "x5","x6", "x7", "x8","x9" };
ExecutorService executor = Executors.newFixedThreadPool(topics.length);
public static void main(String[] args) {
SpringApplication.run(DataApplication.class, args);
}
@Bean
ApplicationRunner init(DataRepository dataRepository) {
return args -> {
for (String topic : topics) {
BlockingQueue<DataRecord> queue = new ArrayBlockingQueue<>(1024);
JKafkaConsumer consumer = new JKafkaConsumer(topic, queue);
consumer.start();
JMessageConsumer messageConsumer = new JMessageConsumer(dataRepository, queue);
executor.submit(messageConsumer);
}
executor.shutdown();
};
}
}而Consumer Runnable有一个构造函数和run()方法,如下所示:
public JMessageConsumer(DataRepository dataRepository, BlockingQueue<DataRecord> queue) {
this.queue = queue;
this.dataRepository = dataRepository;
}
@Override
public void run() {
running.set(true);
while (running.get()) {
// remove record from FIFO blocking queue
DataRecord dataRecord;
try {
dataRecord = queue.take();
} catch (InterruptedException e) {
logger.error("queue exception: " + e.getMessage());
continue;
}
// write to database
dataRepository.save(dataRecord);
}
}学习,所以任何想法/关注/反馈都是值得感谢的。
https://stackoverflow.com/questions/54228571
复制相似问题