我一直在使用一个spring批处理作业,它读取一个示例csv文件并将记录转储到一个表中。我的问题是关于重新启动,我在第三行的文件中引入了一个数据问题(太长了,无法插入)
在第一次运行中
插入前两行,第三行失败(正如预期的那样)
当我重新启动时
选择第四行,并处理文件的其余部分
所有的文档似乎都表明spring batch从它停止的地方重新开始,这是不是意味着第三次(问题记录)被认为是“尝试”的,因此不会再尝试?我以为所有的重启都会失败,直到我修复了这个文件。
@Bean
public FlatFileItemReader<Person> reader() {
return new FlatFileItemReaderBuilder<Person>()
.name("personItemReader")
.resource(new ClassPathResource("sample-data.csv"))
.delimited()
.names(new String[]{"firstName", "lastName"})
.fieldSetMapper(new BeanWrapperFieldSetMapper<Person>() {{
setTargetType(Person.class);
}})
.build();
}
@Bean
public JdbcBatchItemWriter<Person> writer(DataSource dataSource) {
return new JdbcBatchItemWriterBuilder<Person>()
.itemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<>())
.sql("INSERT INTO people (first_name, last_name) VALUES (:firstName, :lastName)")
.dataSource(dataSource)
.build();
}
@Bean
public Step step1(JdbcBatchItemWriter<Person> writer) {
return stepBuilderFactory.get("step1")
.<Person, Person> chunk(1)
.reader(reader())
.processor(processor())
.writer(writer)
.taskExecutor(taskExecutor())
.throttleLimit(1)
.build();
}
@Bean
public Job importUserJob(JobCompletionNotificationListener listener) {
return jobBuilderFactory.get("importUserJob")
.incrementer(new RunIdIncrementer())
.listener(listener)
.start(step1)
.build();
}发布于 2021-01-02 15:48:52
请让我知道你有没有通过下面。如果不清楚,我可以在github中分享相同的示例项目
Spring Batch restart uncompleted jobs from the same execution and step
Spring Batch correctly restart uncompleted jobs in clustered environment
在生产中,我们总是使用“容错”,因此作业将拒绝错误数据并继续执行。稍后的操作将更正数据并再次重新执行作业。这样做的好处是可以连续处理海量数据,不需要等待数据校正。
请将您的代码与下面的代码进行比较
发布于 2021-01-05 16:41:37
您已经为作业设置了RunIdIncrementer,因此每次运行时都会有一个新的作业实例。您需要删除该增量器,并将该文件作为作业参数传递,以便在每次运行时具有相同的作业实例。使用这种方法,在修复文件之前,所有重启都将失败。
顺便说一句,如果你使用多线程步骤,你就不可能有可重启性。这是因为当使用多个线程时,状态将不一致。因此,您需要使用单线程步骤(删除任务执行器)。这里的文档中对此进行了解释:Multi-threaded step。
https://stackoverflow.com/questions/65535631
复制相似问题