首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >Spring数据流:异步DeploymentPartitionHanlder

Spring数据流:异步DeploymentPartitionHanlder
EN

Stack Overflow用户
提问于 2021-06-24 13:56:02
回答 2查看 206关注 0票数 0

TL;博士

我使用这个例子构建了一个简单的应用程序,它使用Spring (远程分区)和Spring数据流在Kubernetes上部署工人荚。

查看在Kubernetes上创建的"partitionedJob“吊舱的日志,我看到工作步骤( pod )是按顺序启动的。发射一个工人舱所需的时间约为10-15秒(有时高达2分钟,如下所示)。结果,工人吊舱的发射间隔为10-15秒,一个接一个。

日志:

代码语言:javascript
复制
[info 2021/06/26 14:30:29.089 UTC <main> tid=0x1] Job: [SimpleJob: [name=job]] launched with the following parameters: [{maxWorkers=40, chunkSize=5000, run.id=13, batch.worker-app=docker://docker-myhost.artifactrepository.net/my-project/myjob:0.1, grideSize=40}]

[info 2021/06/26 14:30:29.155 UTC <main> tid=0x1] The job execution id 26 was run within the task execution 235

[info 2021/06/26 14:30:29.184 UTC <main> tid=0x1] Executing step: [masterStep]

2021-06-26 14:30:29 INFO  AuditRecordPartitioner:51 - Creating partitions. [gridSize=40]

[info 2021/06/26 14:32:41.128 UTC <main> tid=0x1] Using Docker entry point style: exec

[info 2021/06/26 14:34:51.560 UTC <main> tid=0x1] Using Docker image: docker-myhost.artifactrepository.net/myproject/myjob:0.1

[info 2021/06/26 14:34:51.560 UTC <main> tid=0x1] Using Docker entry point style: exec

[info 2021/06/26 14:36:39.464 UTC <main> tid=0x1] Using Docker image: docker-myhost.artifactrepository.net/myproject/myjob:0.1

[info 2021/06/26 14:36:39.464 UTC <main> tid=0x1] Using Docker entry point style: exec

[info 2021/06/26 14:38:34.203 UTC <main> tid=0x1] Using Docker image: docker-myhost.artifactrepository.net/myproject/myjob:0.1

[info 2021/06/26 14:38:34.203 UTC <main> tid=0x1] Using Docker entry point style: exec

[info 2021/06/26 14:40:44.544 UTC <main> tid=0x1] Using Docker image: docker-myhost.artifactrepository.net/myproject/myjob:0.1

[info 2021/06/26 14:40:44.544 UTC <main> tid=0x1] Using Docker entry point style: exec

--在Kubernetes上创建40个豆荚大约需要7-8分钟.(有时这个数字高达20分钟)最理想的是所有分区步骤(工人荚)一次异步启动。

问题:我们如何配置Spring /Spring批处理来异步/并行地启动工作人员荚(分区步骤),而不是顺序启动?如果SCDF确实是一次创建40个分区,为什么在现实中,主任务是以非常慢的速度一个一个地创建这些分区?(如日志中所示)。我不认为这是一个下面的问题,因为我能够使用任务DSL以快速的速度启动任务

相关代码:

代码语言:javascript
复制
@EnableTask
@EnableBatchProcessing
@SpringBootApplication
public class BatchApplication {

    public static void main(String[] args) {
        SpringApplication.run(BatchApplication.class, args);
    }
}


/**
 * 
 * Main job controller
 * 
 * 
 */
@Profile("master")
@Configuration
public class MasterConfiguration {

    private static final Logger LOGGER = LoggerFactory.getLogger(MasterConfiguration.class);

    @Autowired
    private ApplicationArguments applicationArguments;

    @Bean
    public Job job(JobBuilderFactory jobBuilderFactory) {
        LOGGER.info("Creating job...");
        SimpleJobBuilder jobBuilder = jobBuilderFactory.get("job").start(masterStep(null, null, null));

        jobBuilder.incrementer(new RunIdIncrementer());

        return jobBuilder.build();
    }

    @Bean
    public Step masterStep(StepBuilderFactory stepBuilderFactory, Partitioner partitioner,
            PartitionHandler partitionHandler) {
        LOGGER.info("Creating masterStep");
        return stepBuilderFactory.get("masterStep").partitioner("workerStep", partitioner)
                .partitionHandler(partitionHandler).build();
    }

    @Bean
    public DeployerPartitionHandler partitionHandler(@Value("${spring.profiles.active}") String activeProfile,
            @Value("${batch.worker-app}") String resourceLocation,
            @Value("${spring.application.name}") String applicationName, ApplicationContext context,
            TaskLauncher taskLauncher, JobExplorer jobExplorer, ResourceLoaderResolver resolver) {
        ResourceLoader resourceLoader = resolver.get(resourceLocation);
        Resource resource = resourceLoader.getResource(resourceLocation);
        DeployerPartitionHandler partitionHandler = new DeployerPartitionHandler(taskLauncher, jobExplorer, resource,
                "workerStep");

        List<String> commandLineArgs = new ArrayList<>();
        commandLineArgs.add("--spring.profiles.active=" + activeProfile.replace("master", "worker"));
        commandLineArgs.add("--spring.cloud.task.initialize.enable=false");
        commandLineArgs.add("--spring.batch.initializer.enabled=false");

        commandLineArgs.addAll(Arrays.stream(applicationArguments.getSourceArgs()).filter(
                x -> !x.startsWith("--spring.profiles.active=") && !x.startsWith("--spring.cloud.task.executionid="))
                .collect(Collectors.toList()));
        commandLineArgs.addAll(applicationArguments.getNonOptionArgs());

        partitionHandler.setCommandLineArgsProvider(new PassThroughCommandLineArgsProvider(commandLineArgs));
        partitionHandler.setEnvironmentVariablesProvider(new NoOpEnvironmentVariablesProvider());

        List<String> nonOptionArgs = applicationArguments.getNonOptionArgs();

        partitionHandler.setMaxWorkers(Integer.valueOf(getNonOptionArgValue(nonOptionArgs, 3)));
        partitionHandler.setGridSize(Integer.valueOf(getNonOptionArgValue(nonOptionArgs, 4)));
        partitionHandler.setApplicationName(applicationName);

        return partitionHandler;
    }

    @Bean("auditRecordPartitioner")
    public Partitioner auditRecordPartitioner() {
        
        return new AuditRecordPartitioner<>());
    }
    
    private String getNonOptionArgValue(List<String> nonOptionArgs, int index)  {
        return nonOptionArgs.get(index).split("=")[1];
    }
}


@Profile("worker")
@Configuration
public class WorkerConfiguration {

    private static final Logger LOGGER = LoggerFactory.getLogger(WorkerConfiguration.class);

    @Autowired
    public JobBuilderFactory jobBuilderFactory;

    @Autowired
    public StepBuilderFactory stepBuilderFactory;

    @Autowired
    private ApplicationArguments applicationArguments;

    @Bean
    public DeployerStepExecutionHandler stepExecutionHandler(ApplicationContext context, JobExplorer jobExplorer,
            JobRepository jobRepository) {
        LOGGER.info("stepExecutionHandler...");
        return new DeployerStepExecutionHandler(context, jobExplorer, jobRepository);
    }

    @Bean
    public Step workerStep(StepBuilderFactory stepBuilderFactory) {
        return stepBuilderFactory.get("workerStep").tasklet(workerTasklet(null)).build();
    }

    @Bean
    @StepScope
    public WorkerTasklet workerTasklet(@Value("#{stepExecutionContext['key']}") String key) {
        return new WorkerTasklet(key);
    }

    
}

注意到,我将gridSize和maxWorkers作为输入参数传递给主步骤(在启动任务时从SCDF传递)。

EN

回答 2

Stack Overflow用户

回答已采纳

发布于 2021-07-03 16:01:01

正如马哈茂德·本·哈辛在评论中提到的,工作人员是按顺序发射

代码语言:javascript
复制
private void launchWorkers(Set<StepExecution> candidates,
            Set<StepExecution> executed) {
        for (StepExecution execution : candidates) {
            if (this.currentWorkers < this.maxWorkers || this.maxWorkers < 0) {
                launchWorker(execution);
                this.currentWorkers++;

                executed.add(execution);
            }
        }
    }

正如格伦·伦佛罗在注释中提到的那样,已经为相同的内容创建了一个问题。如果可以异步启动工作人员的解决方案,则将更新此答案。

票数 0
EN

Stack Overflow用户

发布于 2021-06-25 13:22:30

为了演示目的,示例将最大工人数设置为2 这里。因此,对于您的40个分区,只有两个工作人员将并行启动,这使您认为您的分区是按顺序处理的。

您需要更新示例(或使其可配置),并根据需要增加并发工作人员的数量。

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/68117167

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档