首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >使用Kafka - master的Spring批处理远程分区始终继续JobListener中最古老的作业。

使用Kafka - master的Spring批处理远程分区始终继续JobListener中最古老的作业。
EN

Stack Overflow用户
提问于 2022-06-13 10:14:36
回答 1查看 243关注 0票数 0

我使用的是spring-batchspring-boot 2.5.6。我决定使用以Kafka为中间件的远程分区。我有一个经理和三个工人。因此,为管理器的输入主题分配了一个分区,为员工的输入分配了三个分区。

管理器获取一个文件,创建多个ExecutionContext,并通过Kafka发送这些文件。工作人员开始处理相应的步骤,并在流程结束时发送消息。经理将汇总员工的结果,并决定完成所有员工的工作。到目前一切尚好。

现在假设我首先运行一个需要大量时间才能完成的长期运行的作业,然后运行一个完成很快的小任务。不足为奇的是,第二个作业完成得更快,并发出一个已完成的信号,经理将使用此消息并继续该过程。我甚至检查了AggregatingMessageHandler,完成的消息只与第二个作业(短时间运行的一个)有关,我检查了jobExecutionId

现在问题发生了,我有一个JobListener,它有一个afterJob方法。此方法将针对第一个作业(工人仍在处理的长期作业)运行,而不是针对第二个作业(已完成的信号已发送给它的短运行作业)!我可以通过查看jobExecutionId来说明这一点。这真的很奇怪,因为我从来没有在日志中看到第一份工作有一个完成信号。

在一段时间之后,每当第一个长时间运行的作业完成时,最终的工作人员发送一条已完成的消息,而经理决定完成该任务,现在JobListener将针对第二个作业(短时间运行的作业)运行!

我不明白出了什么问题?我想假设这可能是一个错误配置,但是通过调试代码并检查工作人员和管理器中的AggregatingMessageHandler和跟踪日志,我可以清楚地看到消息被发送得很好,并且消息没有任何问题。欢迎任何建议/想法。

更新

下面是一个示例实现:假设我们有一个Customer表。作业采用minIdmaxId ( Customer表中的ID列是一个简单的数字),然后管理器根据ID范围创建多个ExecutionContexts。

管理器配置

代码语言:javascript
复制
package com.example.batchdemo.job;

import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.batch.core.explore.JobExplorer;
import org.springframework.batch.core.launch.support.RunIdIncrementer;
import org.springframework.batch.core.partition.support.Partitioner;
import org.springframework.batch.integration.partition.RemotePartitioningManagerStepBuilderFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Profile;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.IntegrationFlows;
import org.springframework.integration.kafka.dsl.Kafka;
import org.springframework.integration.scheduling.PollerMetadata;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.listener.ConsumerProperties;
import org.springframework.scheduling.support.PeriodicTrigger;

@Profile("!worker")
@Configuration
public class JobConfiguration {

    private final JobBuilderFactory jobBuilderFactory;
    private final JobExplorer jobExplorer;
    private final RemotePartitioningManagerStepBuilderFactory managerStepBuilderFactory;
    private final JobListener jobListener;

    public JobConfiguration(JobBuilderFactory jobBuilderFactory, JobExplorer jobExplorer, RemotePartitioningManagerStepBuilderFactory managerStepBuilderFactory, JobListener jobListener) {
        this.jobBuilderFactory = jobBuilderFactory;
        this.jobExplorer = jobExplorer;
        this.managerStepBuilderFactory = managerStepBuilderFactory;
        this.jobListener = jobListener;
    }

    @Bean
    public Job job() {
        return jobBuilderFactory.get("job")
                .incrementer(new RunIdIncrementer())
                .start(managerStep())
                .listener(jobListener)
                .build();
    }

    @Bean
    public Step managerStep() {
        return managerStepBuilderFactory.get("managerStep")
                .partitioner("workerStep", rangePartitioner(null, null))
                .outputChannel(requestForWorkers())
                .inputChannel(repliesFromWorkers())
                .jobExplorer(jobExplorer)
                .build();
    }

    @Bean
    @StepScope
    public Partitioner rangePartitioner(@Value("#{jobParameters['minId']}") Integer minId, @Value("#{jobParameters['maxId']}") Integer maxId) {
        return new CustomerIdRangePartitioner(minId, maxId);
    }


    ////////////////////////////////////////////////////////////////////////////////////////////////

    @Bean
    public DirectChannel requestForWorkers() {
        return new DirectChannel();
    }

    @Bean
    public IntegrationFlow outboundFlow(KafkaTemplate kafkaTemplate) {
        return IntegrationFlows
                .from(requestForWorkers())
                .handle(Kafka.outboundChannelAdapter(kafkaTemplate).topic("requestForWorkers"))
                .route("requestForWorkers")
                .get();
    }

    @Bean
    public DirectChannel repliesFromWorkers() {
        return new DirectChannel();
    }

    @Bean
    public IntegrationFlow inboundFlow(ConsumerFactory consumerFactory) {
        return IntegrationFlows
                .from(Kafka.inboundChannelAdapter(consumerFactory, new ConsumerProperties("repliesFromWorkers")))
                .channel(repliesFromWorkers())
                .get();
    }

    @Bean(name = PollerMetadata.DEFAULT_POLLER)
    public PollerMetadata defaultPoller() {
        PollerMetadata pollerMetadata = new PollerMetadata();
        pollerMetadata.setTrigger(new PeriodicTrigger(10));
        return pollerMetadata;
    }

}

工人配置

代码语言:javascript
复制
package com.example.batchdemo.job;

import com.example.batchdemo.domain.Customer;
import com.example.batchdemo.domain.CustomerRowMapper;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.batch.core.step.builder.SimpleStepBuilder;
import org.springframework.batch.integration.partition.RemotePartitioningWorkerStepBuilderFactory;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.item.database.BeanPropertyItemSqlParameterSourceProvider;
import org.springframework.batch.item.database.JdbcBatchItemWriter;
import org.springframework.batch.item.database.JdbcPagingItemReader;
import org.springframework.batch.item.database.Order;
import org.springframework.batch.item.database.support.MySqlPagingQueryProvider;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Profile;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.IntegrationFlows;
import org.springframework.integration.kafka.dsl.Kafka;
import org.springframework.integration.scheduling.PollerMetadata;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.listener.ConsumerProperties;
import org.springframework.scheduling.support.PeriodicTrigger;

import javax.sql.DataSource;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

@Configuration
@Profile("worker")
public class WorkerConfiguration {

    private static final int CHUNK_SIZE = 10;
    private static final int WAITING_TIME = 3000;

    public final DataSource dataSource;
    private final RemotePartitioningWorkerStepBuilderFactory workerStepBuilderFactory;

    public WorkerConfiguration(DataSource dataSource, RemotePartitioningWorkerStepBuilderFactory workerStepBuilderFactory) {
        this.dataSource = dataSource;
        this.workerStepBuilderFactory = workerStepBuilderFactory;
    }

    @Bean
    public DirectChannel repliesFromWorkers() {
        return new DirectChannel();
    }

    @Bean
    public IntegrationFlow outboundFlow(KafkaTemplate kafkaTemplate) {
        return IntegrationFlows
                .from(repliesFromWorkers())
                .handle(Kafka.outboundChannelAdapter(kafkaTemplate).topic("repliesFromWorkers"))
                .route("repliesFromWorkers")
                .get();
    }

    @Bean
    public DirectChannel requestForWorkers() {
        return new DirectChannel();
    }

    @Bean
    public IntegrationFlow inboundFlow(ConsumerFactory consumerFactory) {
        return IntegrationFlows
                .from(Kafka.inboundChannelAdapter(consumerFactory, new ConsumerProperties("requestForWorkers")))
                .channel(requestForWorkers())
                .get();
    }

    @Bean(name = PollerMetadata.DEFAULT_POLLER)
    public PollerMetadata defaultPoller() {
        PollerMetadata pollerMetadata = new PollerMetadata();
        pollerMetadata.setTrigger(new PeriodicTrigger(10));
        return pollerMetadata;
    }

    /////////////////////////////////////////////////////////////////
    /////////////////////////////////////////////////////////////////

    @Bean
    public Step workerStep() {
        SimpleStepBuilder workerStepBuilder = workerStepBuilderFactory.get("workerStep")
                .inputChannel(requestForWorkers())
                .outputChannel(repliesFromWorkers())
                .<Customer, Customer>chunk(CHUNK_SIZE)
                .reader(pagingItemReader(null, null))
                .processor(itemProcessor())
                .writer(customerItemWriter());
        return workerStepBuilder.build();
    }

    @Bean
    @StepScope
    public JdbcPagingItemReader<Customer> pagingItemReader(@Value("#{stepExecutionContext['minValue']}") Long minValue,
                                                           @Value("#{stepExecutionContext['maxValue']}") Long maxValue) {
        System.out.println("reading " + minValue + " to " + maxValue);
        JdbcPagingItemReader<Customer> reader = new JdbcPagingItemReader<>();

        reader.setDataSource(this.dataSource);
        reader.setFetchSize(1000);
        reader.setRowMapper(new CustomerRowMapper());

        MySqlPagingQueryProvider queryProvider = new MySqlPagingQueryProvider();
        queryProvider.setSelectClause("id, firstName, lastName, birthdate");
        queryProvider.setFromClause("from CUSTOMER");
        queryProvider.setWhereClause("where id >= " + minValue + " and id < " + maxValue);

        Map<String, Order> sortKeys = new HashMap<>(1);

        sortKeys.put("id", Order.ASCENDING);

        queryProvider.setSortKeys(sortKeys);

        reader.setQueryProvider(queryProvider);

        return reader;
    }

    @Bean
    @StepScope
    public ItemProcessor<Customer, Customer> itemProcessor() {
        return item -> {
            Thread.sleep(WAITING_TIME);
            System.out.println(item);
            return item;
        };
    }

    @Bean
    @StepScope
    public ItemWriter<Customer> customerItemWriter() {
        return items -> {
            System.out.printf("%d items were written%n", items.size());
        };
    }

}

分割者:

代码语言:javascript
复制
package com.example.batchdemo.job;

import org.springframework.batch.core.partition.support.Partitioner;
import org.springframework.batch.item.ExecutionContext;

import java.util.HashMap;
import java.util.Map;

public class CustomerIdRangePartitioner implements Partitioner {

    private final int minId;
    private final int maxId;
    private final int gridSize;

    public CustomerIdRangePartitioner(int minId, int maxId, int gridSize) {
        this.minId = minId;
        this.maxId = maxId;
        this.gridSize = gridSize;
    }

    @Override
    public Map<String, ExecutionContext> partition(int gridSize) {
        int number = (maxId - minId) / this.gridSize + 1;

        Map<String, ExecutionContext> result = new HashMap<>();
        for (int i = 0; i < number; i++) {
            ExecutionContext executionContext = new ExecutionContext();
            int start = minId + (this.gridSize * i);
            int end = start + (this.gridSize * (i + 1));
            executionContext.putInt("minValue", start);
            executionContext.putInt("maxValue", Math.min(end, maxId));
            result.put("partition" + i, executionContext);
        }

        return result;
    }
}

JobListener

代码语言:javascript
复制
package com.example.batchdemo.job;

import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.JobExecutionListener;
import org.springframework.batch.core.configuration.annotation.JobScope;
import org.springframework.stereotype.Component;

@Component
@JobScope
public class JobListener implements JobExecutionListener {

    @Override
    public void beforeJob(JobExecution jobExecution) {

    }

    @Override
    public void afterJob(JobExecution jobExecution) {
        System.out.println(jobExecution.getJobId() + " was finished: " + jobExecution.getStatus());
    }

}

AppConfiguration

代码语言:javascript
复制
package com.example.batchdemo.controller;

import org.springframework.batch.core.configuration.JobRegistry;
import org.springframework.batch.core.configuration.support.JobRegistryBeanPostProcessor;
import org.springframework.batch.core.converter.DefaultJobParametersConverter;
import org.springframework.batch.core.explore.JobExplorer;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.batch.core.launch.JobOperator;
import org.springframework.batch.core.launch.support.SimpleJobLauncher;
import org.springframework.batch.core.launch.support.SimpleJobOperator;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

@Configuration
public class AppConfiguration {

    private final JobExplorer jobExplorer;
    private final JobRepository jobRepository;
    private final JobRegistry jobRegistry;
    private final ApplicationContext applicationContext;

    public AppConfiguration(JobExplorer jobExplorer, JobRepository jobRepository, JobRegistry jobRegistry, ApplicationContext applicationContext) {
        this.jobExplorer = jobExplorer;
        this.jobRepository = jobRepository;
        this.jobRegistry = jobRegistry;
        this.applicationContext = applicationContext;
    }

    @Bean
    public synchronized JobRegistryBeanPostProcessor jobRegistrar() throws Exception {
        JobRegistryBeanPostProcessor registrar = new JobRegistryBeanPostProcessor();

        registrar.setJobRegistry(jobRegistry);
        registrar.setBeanFactory(applicationContext.getAutowireCapableBeanFactory());
        registrar.afterPropertiesSet();

        return registrar;
    }

    @Bean
    public JobOperator jobOperator() throws Exception {
        SimpleJobOperator simpleJobOperator = new SimpleJobOperator();

        simpleJobOperator.setJobLauncher(getJobLauncher());
        simpleJobOperator.setJobParametersConverter(new DefaultJobParametersConverter());
        simpleJobOperator.setJobRepository(this.jobRepository);
        simpleJobOperator.setJobExplorer(this.jobExplorer);
        simpleJobOperator.setJobRegistry(this.jobRegistry);

        simpleJobOperator.afterPropertiesSet();

        return simpleJobOperator;
    }

    @Bean
    public JobLauncher getJobLauncher() throws Exception {
        SimpleJobLauncher jobLauncher = null;
        jobLauncher = new SimpleJobLauncher();
        jobLauncher.setJobRepository(jobRepository);
        jobLauncher.setTaskExecutor(jobOperatorExecutor());
        jobLauncher.afterPropertiesSet();
        return jobLauncher;
    }

    @Bean
    public ThreadPoolTaskExecutor jobOperatorExecutor() {
        ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor();
        threadPoolTaskExecutor.setCorePoolSize(64);
        threadPoolTaskExecutor.setMaxPoolSize(256);
        threadPoolTaskExecutor.setWaitForTasksToCompleteOnShutdown(true);
        return threadPoolTaskExecutor;
    }

}

pom.xml

代码语言:javascript
复制
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.5.6</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>com.example</groupId>
    <artifactId>batch-demo</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>batch-demo</name>
    <description>Demo project for Spring Boot</description>
    <properties>
        <java.version>11</java.version>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-batch</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-jdbc</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <scope>runtime</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework.batch</groupId>
            <artifactId>spring-batch-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework.batch</groupId>
            <artifactId>spring-batch-integration</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.integration</groupId>
            <artifactId>spring-integration-kafka</artifactId>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

</project>
EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2022-06-22 15:39:20

这是Spring批处理中的一个bug。侦听器确实会被调用,以便在早期使用错误的JobExecution实例完成任务。设置JobExecutionListener作业范围并不能解决这个问题。

我会重新打开吉顿的问题以便进一步调查。

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/72601168

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档