我使用的是spring-batch和spring-boot 2.5.6。我决定使用以Kafka为中间件的远程分区。我有一个经理和三个工人。因此,为管理器的输入主题分配了一个分区,为员工的输入分配了三个分区。
管理器获取一个文件,创建多个ExecutionContext,并通过Kafka发送这些文件。工作人员开始处理相应的步骤,并在流程结束时发送消息。经理将汇总员工的结果,并决定完成所有员工的工作。到目前一切尚好。
现在假设我首先运行一个需要大量时间才能完成的长期运行的作业,然后运行一个完成很快的小任务。不足为奇的是,第二个作业完成得更快,并发出一个已完成的信号,经理将使用此消息并继续该过程。我甚至检查了AggregatingMessageHandler,完成的消息只与第二个作业(短时间运行的一个)有关,我检查了jobExecutionId。
现在问题发生了,我有一个JobListener,它有一个afterJob方法。此方法将针对第一个作业(工人仍在处理的长期作业)运行,而不是针对第二个作业(已完成的信号已发送给它的短运行作业)!我可以通过查看jobExecutionId来说明这一点。这真的很奇怪,因为我从来没有在日志中看到第一份工作有一个完成信号。
在一段时间之后,每当第一个长时间运行的作业完成时,最终的工作人员发送一条已完成的消息,而经理决定完成该任务,现在JobListener将针对第二个作业(短时间运行的作业)运行!
我不明白出了什么问题?我想假设这可能是一个错误配置,但是通过调试代码并检查工作人员和管理器中的AggregatingMessageHandler和跟踪日志,我可以清楚地看到消息被发送得很好,并且消息没有任何问题。欢迎任何建议/想法。
更新
下面是一个示例实现:假设我们有一个Customer表。作业采用minId和maxId ( Customer表中的ID列是一个简单的数字),然后管理器根据ID范围创建多个ExecutionContexts。
管理器配置
package com.example.batchdemo.job;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.batch.core.explore.JobExplorer;
import org.springframework.batch.core.launch.support.RunIdIncrementer;
import org.springframework.batch.core.partition.support.Partitioner;
import org.springframework.batch.integration.partition.RemotePartitioningManagerStepBuilderFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Profile;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.IntegrationFlows;
import org.springframework.integration.kafka.dsl.Kafka;
import org.springframework.integration.scheduling.PollerMetadata;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.listener.ConsumerProperties;
import org.springframework.scheduling.support.PeriodicTrigger;
@Profile("!worker")
@Configuration
public class JobConfiguration {
private final JobBuilderFactory jobBuilderFactory;
private final JobExplorer jobExplorer;
private final RemotePartitioningManagerStepBuilderFactory managerStepBuilderFactory;
private final JobListener jobListener;
public JobConfiguration(JobBuilderFactory jobBuilderFactory, JobExplorer jobExplorer, RemotePartitioningManagerStepBuilderFactory managerStepBuilderFactory, JobListener jobListener) {
this.jobBuilderFactory = jobBuilderFactory;
this.jobExplorer = jobExplorer;
this.managerStepBuilderFactory = managerStepBuilderFactory;
this.jobListener = jobListener;
}
@Bean
public Job job() {
return jobBuilderFactory.get("job")
.incrementer(new RunIdIncrementer())
.start(managerStep())
.listener(jobListener)
.build();
}
@Bean
public Step managerStep() {
return managerStepBuilderFactory.get("managerStep")
.partitioner("workerStep", rangePartitioner(null, null))
.outputChannel(requestForWorkers())
.inputChannel(repliesFromWorkers())
.jobExplorer(jobExplorer)
.build();
}
@Bean
@StepScope
public Partitioner rangePartitioner(@Value("#{jobParameters['minId']}") Integer minId, @Value("#{jobParameters['maxId']}") Integer maxId) {
return new CustomerIdRangePartitioner(minId, maxId);
}
////////////////////////////////////////////////////////////////////////////////////////////////
@Bean
public DirectChannel requestForWorkers() {
return new DirectChannel();
}
@Bean
public IntegrationFlow outboundFlow(KafkaTemplate kafkaTemplate) {
return IntegrationFlows
.from(requestForWorkers())
.handle(Kafka.outboundChannelAdapter(kafkaTemplate).topic("requestForWorkers"))
.route("requestForWorkers")
.get();
}
@Bean
public DirectChannel repliesFromWorkers() {
return new DirectChannel();
}
@Bean
public IntegrationFlow inboundFlow(ConsumerFactory consumerFactory) {
return IntegrationFlows
.from(Kafka.inboundChannelAdapter(consumerFactory, new ConsumerProperties("repliesFromWorkers")))
.channel(repliesFromWorkers())
.get();
}
@Bean(name = PollerMetadata.DEFAULT_POLLER)
public PollerMetadata defaultPoller() {
PollerMetadata pollerMetadata = new PollerMetadata();
pollerMetadata.setTrigger(new PeriodicTrigger(10));
return pollerMetadata;
}
}工人配置
package com.example.batchdemo.job;
import com.example.batchdemo.domain.Customer;
import com.example.batchdemo.domain.CustomerRowMapper;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.batch.core.step.builder.SimpleStepBuilder;
import org.springframework.batch.integration.partition.RemotePartitioningWorkerStepBuilderFactory;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.item.database.BeanPropertyItemSqlParameterSourceProvider;
import org.springframework.batch.item.database.JdbcBatchItemWriter;
import org.springframework.batch.item.database.JdbcPagingItemReader;
import org.springframework.batch.item.database.Order;
import org.springframework.batch.item.database.support.MySqlPagingQueryProvider;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Profile;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.IntegrationFlows;
import org.springframework.integration.kafka.dsl.Kafka;
import org.springframework.integration.scheduling.PollerMetadata;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.listener.ConsumerProperties;
import org.springframework.scheduling.support.PeriodicTrigger;
import javax.sql.DataSource;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@Configuration
@Profile("worker")
public class WorkerConfiguration {
private static final int CHUNK_SIZE = 10;
private static final int WAITING_TIME = 3000;
public final DataSource dataSource;
private final RemotePartitioningWorkerStepBuilderFactory workerStepBuilderFactory;
public WorkerConfiguration(DataSource dataSource, RemotePartitioningWorkerStepBuilderFactory workerStepBuilderFactory) {
this.dataSource = dataSource;
this.workerStepBuilderFactory = workerStepBuilderFactory;
}
@Bean
public DirectChannel repliesFromWorkers() {
return new DirectChannel();
}
@Bean
public IntegrationFlow outboundFlow(KafkaTemplate kafkaTemplate) {
return IntegrationFlows
.from(repliesFromWorkers())
.handle(Kafka.outboundChannelAdapter(kafkaTemplate).topic("repliesFromWorkers"))
.route("repliesFromWorkers")
.get();
}
@Bean
public DirectChannel requestForWorkers() {
return new DirectChannel();
}
@Bean
public IntegrationFlow inboundFlow(ConsumerFactory consumerFactory) {
return IntegrationFlows
.from(Kafka.inboundChannelAdapter(consumerFactory, new ConsumerProperties("requestForWorkers")))
.channel(requestForWorkers())
.get();
}
@Bean(name = PollerMetadata.DEFAULT_POLLER)
public PollerMetadata defaultPoller() {
PollerMetadata pollerMetadata = new PollerMetadata();
pollerMetadata.setTrigger(new PeriodicTrigger(10));
return pollerMetadata;
}
/////////////////////////////////////////////////////////////////
/////////////////////////////////////////////////////////////////
@Bean
public Step workerStep() {
SimpleStepBuilder workerStepBuilder = workerStepBuilderFactory.get("workerStep")
.inputChannel(requestForWorkers())
.outputChannel(repliesFromWorkers())
.<Customer, Customer>chunk(CHUNK_SIZE)
.reader(pagingItemReader(null, null))
.processor(itemProcessor())
.writer(customerItemWriter());
return workerStepBuilder.build();
}
@Bean
@StepScope
public JdbcPagingItemReader<Customer> pagingItemReader(@Value("#{stepExecutionContext['minValue']}") Long minValue,
@Value("#{stepExecutionContext['maxValue']}") Long maxValue) {
System.out.println("reading " + minValue + " to " + maxValue);
JdbcPagingItemReader<Customer> reader = new JdbcPagingItemReader<>();
reader.setDataSource(this.dataSource);
reader.setFetchSize(1000);
reader.setRowMapper(new CustomerRowMapper());
MySqlPagingQueryProvider queryProvider = new MySqlPagingQueryProvider();
queryProvider.setSelectClause("id, firstName, lastName, birthdate");
queryProvider.setFromClause("from CUSTOMER");
queryProvider.setWhereClause("where id >= " + minValue + " and id < " + maxValue);
Map<String, Order> sortKeys = new HashMap<>(1);
sortKeys.put("id", Order.ASCENDING);
queryProvider.setSortKeys(sortKeys);
reader.setQueryProvider(queryProvider);
return reader;
}
@Bean
@StepScope
public ItemProcessor<Customer, Customer> itemProcessor() {
return item -> {
Thread.sleep(WAITING_TIME);
System.out.println(item);
return item;
};
}
@Bean
@StepScope
public ItemWriter<Customer> customerItemWriter() {
return items -> {
System.out.printf("%d items were written%n", items.size());
};
}
}分割者:
package com.example.batchdemo.job;
import org.springframework.batch.core.partition.support.Partitioner;
import org.springframework.batch.item.ExecutionContext;
import java.util.HashMap;
import java.util.Map;
public class CustomerIdRangePartitioner implements Partitioner {
private final int minId;
private final int maxId;
private final int gridSize;
public CustomerIdRangePartitioner(int minId, int maxId, int gridSize) {
this.minId = minId;
this.maxId = maxId;
this.gridSize = gridSize;
}
@Override
public Map<String, ExecutionContext> partition(int gridSize) {
int number = (maxId - minId) / this.gridSize + 1;
Map<String, ExecutionContext> result = new HashMap<>();
for (int i = 0; i < number; i++) {
ExecutionContext executionContext = new ExecutionContext();
int start = minId + (this.gridSize * i);
int end = start + (this.gridSize * (i + 1));
executionContext.putInt("minValue", start);
executionContext.putInt("maxValue", Math.min(end, maxId));
result.put("partition" + i, executionContext);
}
return result;
}
}JobListener
package com.example.batchdemo.job;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.JobExecutionListener;
import org.springframework.batch.core.configuration.annotation.JobScope;
import org.springframework.stereotype.Component;
@Component
@JobScope
public class JobListener implements JobExecutionListener {
@Override
public void beforeJob(JobExecution jobExecution) {
}
@Override
public void afterJob(JobExecution jobExecution) {
System.out.println(jobExecution.getJobId() + " was finished: " + jobExecution.getStatus());
}
}AppConfiguration
package com.example.batchdemo.controller;
import org.springframework.batch.core.configuration.JobRegistry;
import org.springframework.batch.core.configuration.support.JobRegistryBeanPostProcessor;
import org.springframework.batch.core.converter.DefaultJobParametersConverter;
import org.springframework.batch.core.explore.JobExplorer;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.batch.core.launch.JobOperator;
import org.springframework.batch.core.launch.support.SimpleJobLauncher;
import org.springframework.batch.core.launch.support.SimpleJobOperator;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
@Configuration
public class AppConfiguration {
private final JobExplorer jobExplorer;
private final JobRepository jobRepository;
private final JobRegistry jobRegistry;
private final ApplicationContext applicationContext;
public AppConfiguration(JobExplorer jobExplorer, JobRepository jobRepository, JobRegistry jobRegistry, ApplicationContext applicationContext) {
this.jobExplorer = jobExplorer;
this.jobRepository = jobRepository;
this.jobRegistry = jobRegistry;
this.applicationContext = applicationContext;
}
@Bean
public synchronized JobRegistryBeanPostProcessor jobRegistrar() throws Exception {
JobRegistryBeanPostProcessor registrar = new JobRegistryBeanPostProcessor();
registrar.setJobRegistry(jobRegistry);
registrar.setBeanFactory(applicationContext.getAutowireCapableBeanFactory());
registrar.afterPropertiesSet();
return registrar;
}
@Bean
public JobOperator jobOperator() throws Exception {
SimpleJobOperator simpleJobOperator = new SimpleJobOperator();
simpleJobOperator.setJobLauncher(getJobLauncher());
simpleJobOperator.setJobParametersConverter(new DefaultJobParametersConverter());
simpleJobOperator.setJobRepository(this.jobRepository);
simpleJobOperator.setJobExplorer(this.jobExplorer);
simpleJobOperator.setJobRegistry(this.jobRegistry);
simpleJobOperator.afterPropertiesSet();
return simpleJobOperator;
}
@Bean
public JobLauncher getJobLauncher() throws Exception {
SimpleJobLauncher jobLauncher = null;
jobLauncher = new SimpleJobLauncher();
jobLauncher.setJobRepository(jobRepository);
jobLauncher.setTaskExecutor(jobOperatorExecutor());
jobLauncher.afterPropertiesSet();
return jobLauncher;
}
@Bean
public ThreadPoolTaskExecutor jobOperatorExecutor() {
ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor();
threadPoolTaskExecutor.setCorePoolSize(64);
threadPoolTaskExecutor.setMaxPoolSize(256);
threadPoolTaskExecutor.setWaitForTasksToCompleteOnShutdown(true);
return threadPoolTaskExecutor;
}
}pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.5.6</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.example</groupId>
<artifactId>batch-demo</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>batch-demo</name>
<description>Demo project for Spring Boot</description>
<properties>
<java.version>11</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-batch</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-jdbc</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.batch</groupId>
<artifactId>spring-batch-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.batch</groupId>
<artifactId>spring-batch-integration</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-kafka</artifactId>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>发布于 2022-06-22 15:39:20
这是Spring批处理中的一个bug。侦听器确实会被调用,以便在早期使用错误的JobExecution实例完成任务。设置JobExecutionListener作业范围并不能解决这个问题。
我会重新打开吉顿的问题以便进一步调查。
https://stackoverflow.com/questions/72601168
复制相似问题