首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >spring批量中的数据差异

spring批量中的数据差异
EN

Stack Overflow用户
提问于 2021-10-03 12:39:21
回答 1查看 60关注 0票数 0

我正在使用spring批处理从DB传输数据,并在Solr服务器中对它们进行索引。我注意到Solr中索引了确切数量的数据记录。但是当我检查数据时,我注意到我的Solr中有重复的行。关于DB和Solr中的确切行数,Spring批处理似乎以某种方式多次写入一条记录,而不是新的合法记录。

我更改了目标,将所有内容写入CSV文件,以确保这不是Solr问题。我可以重复这个问题。但是,不幸的是,这意味着我的CSV文件中也有重复的行。

下面是我的代码片段,我尽可能地使其简单。我必须使用一些分页来读取我的数据库,因为我有大约2000万行数据,读取所有数据都会使我的服务器崩溃。

代码语言:javascript
复制
    @Bean
    public Step indexingStep() {
        // @formatter:off
        return stepBuilder
            .get("stepOne")
            .<AddressInfo, AddressInfo> chunk(chunkSize)
            .reader(addressReader())
            .writer(itemWriter())
        .build();
        // @formatter:on
    }

@Bean
public FlatFileItemWriter<AddressInfo> itemWriter() {
  return new FlatFileItemWriterBuilder<AddressInfo>()
      .name("personItemWriter")
      .resource(new FileSystemResource(
          "data.csv"))
      .delimited().delimiter(", ")
      .names(new String[] { "addressDetailPid", "streetLocalityPid", 
             "localityPid"}).build();
}
@Bean
public ItemReader<AddressInfo> addressReader() {

    // @formatter:off
    HibernatePagingItemReader<AddressInfo> hibernatePagingItemReader =  new HibernatePagingItemReaderBuilder<AddressInfo>()
            .name("addReader")
            .sessionFactory(sessionFactory)
            .queryString("select f from AddressInfo f")
            .saveState(false)
            .pageSize(pageSize)
        .build();
    hibernatePagingItemReader.setUseStatelessSession(true);
    return hibernatePagingItemReader;
    // @formatter:on    
}

更新:我尝试使用"JpaPagingItemReader“解决了同样的问题。似乎是分页机制导致了这个问题。当我在大多数情况下使用基于指针的阅读器时,它工作得很好。但我必须对我的数据进行分区以防止OOM异常。我不相信Spring Batch有这个根本的问题,而且只有我看到了它。

这就是我的环境: jdk11 Spring Boot2.4.1 DB: postgres9Solr: 8.1

解决方法:看起来,我需要在JPQL中添加"order by“,如下所示:

代码语言:javascript
复制
@Bean
    public JpaPagingItemReader<AddressInfo> itemReader() {
        // @formatter:off
        return new JpaPagingItemReaderBuilder<AddressInfo>()
                    .name("creditReader")
                    .entityManagerFactory(emf)
                    .queryString("select c from AddressInfo c order by c.addressDetailPid ")
                    .pageSize(pageSize)
                    .build();
        // @formatter:on
    }
EN

回答 1

Stack Overflow用户

发布于 2021-10-05 08:52:55

可以使用fetchSizeHibernateCursorItemReader进行配置,以限制每个数据库往返过程中提取的行数。这20M行不会一次加载到内存中,而是以基于指针的方法进行流式处理。

面向块步骤的chunkSize值在这里也很关键。JVM至少应该有足够的内存来加载内存中的单个项目块。已处理的区块将被垃圾收集。

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/69424912

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档