From 87e3714ecbcb256e854640873fd4df9c17bb495d Mon Sep 17 00:00:00 2001 From: Lars Uffmann Date: Fri, 3 Nov 2023 11:11:04 +0100 Subject: [PATCH 1/5] Merge all previous parameters in case of a job restart --- .../batch/JobLauncherApplicationRunner.java | 14 +----- .../JobLauncherApplicationRunnerTests.java | 43 +++++++++++++++++++ 2 files changed, 44 insertions(+), 13 deletions(-) diff --git a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/batch/JobLauncherApplicationRunner.java b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/batch/JobLauncherApplicationRunner.java index b4415ac365ef..6bd8fe96000a 100644 --- a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/batch/JobLauncherApplicationRunner.java +++ b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/batch/JobLauncherApplicationRunner.java @@ -19,7 +19,6 @@ import java.util.Arrays; import java.util.Collection; import java.util.Collections; -import java.util.HashMap; import java.util.LinkedHashMap; import java.util.Map; import java.util.Properties; @@ -230,8 +229,7 @@ private JobParameters getNextJobParameters(Job job, JobParameters jobParameters) private JobParameters getNextJobParametersForExisting(Job job, JobParameters jobParameters) { JobExecution lastExecution = this.jobRepository.getLastJobExecution(job.getName(), jobParameters); if (isStoppedOrFailed(lastExecution) && job.isRestartable()) { - JobParameters previousIdentifyingParameters = getGetIdentifying(lastExecution.getJobParameters()); - return merge(previousIdentifyingParameters, jobParameters); + return merge(lastExecution.getJobParameters(), jobParameters); } return jobParameters; } @@ -241,16 +239,6 @@ private boolean isStoppedOrFailed(JobExecution execution) { return (status == BatchStatus.STOPPED || status == BatchStatus.FAILED); } - private JobParameters getGetIdentifying(JobParameters parameters) { - HashMap> nonIdentifying = new LinkedHashMap<>(parameters.getParameters().size()); - parameters.getParameters().forEach((key, value) -> { - if (value.isIdentifying()) { - nonIdentifying.put(key, value); - } - }); - return new JobParameters(nonIdentifying); - } - private JobParameters merge(JobParameters parameters, JobParameters additionals) { Map> merged = new LinkedHashMap<>(); merged.putAll(parameters.getParameters()); diff --git a/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/batch/JobLauncherApplicationRunnerTests.java b/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/batch/JobLauncherApplicationRunnerTests.java index b5149782e686..586c77c768e5 100644 --- a/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/batch/JobLauncherApplicationRunnerTests.java +++ b/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/batch/JobLauncherApplicationRunnerTests.java @@ -24,6 +24,7 @@ import org.junit.jupiter.api.Test; import org.springframework.batch.core.Job; +import org.springframework.batch.core.JobExecution; import org.springframework.batch.core.JobExecutionException; import org.springframework.batch.core.JobInstance; import org.springframework.batch.core.JobParameters; @@ -171,6 +172,48 @@ void retryFailedExecutionWithNonIdentifyingParameters() { }); } + @Test + void retryFailedExecutionWithNonIdentifyingParameters_previousParamsAreMerged() { + this.contextRunner.run((context) -> { + PlatformTransactionManager transactionManager = context.getBean(PlatformTransactionManager.class); + JobLauncherApplicationRunnerContext jobLauncherContext = new JobLauncherApplicationRunnerContext(context); + Job job = jobLauncherContext.jobBuilder() + .start(jobLauncherContext.stepBuilder().tasklet(throwingTasklet(), transactionManager).build()) + .incrementer(new RunIdIncrementer()) + .build(); + JobParameters jobParameters = new JobParametersBuilder().addLong("id", 1L, false) + .addLong("foo", 2L, false) + .toJobParameters(); + jobLauncherContext.runner.execute(job, jobParameters); + assertThat(jobLauncherContext.jobInstances()).hasSize(1); + + // try to re-run a failed execution with non identifying parameters + // identifying parameter 'run.id' is used to find the failed execution. + // non-identifying parameter 'id' is copied form the previous execution. + // non-identifying parameter 'foo' is replaced with a new value. + jobLauncherContext.runner.execute(job, + new JobParametersBuilder().addLong("run.id", 1L).addLong("foo", 3L, false).toJobParameters()); + + assertThat(jobLauncherContext.jobInstances()).hasSize(1); + + JobExecution initialExecution = jobLauncherContext.jobExplorer.getJobExecution(1L); + assertThat(initialExecution).isNotNull(); + JobParameters initialParams = initialExecution.getJobParameters(); + assertThat(initialParams.getParameters()).hasSize(3); + assertThat(initialParams.getLong("id")).isEqualTo(1L); + assertThat(initialParams.getLong("foo")).isEqualTo(2L); + assertThat(initialParams.getLong("run.id")).isEqualTo(1L); + + JobExecution retryExecution = jobLauncherContext.jobExplorer.getJobExecution(2L); + assertThat(retryExecution).isNotNull(); + JobParameters retryParams = retryExecution.getJobParameters(); + assertThat(retryParams.getParameters()).hasSize(3); + assertThat(retryParams.getLong("id")).isEqualTo(1L); + assertThat(retryParams.getLong("foo")).isEqualTo(3L); + assertThat(retryParams.getLong("run.id")).isEqualTo(1L); + }); + } + private Tasklet throwingTasklet() { return (contribution, chunkContext) -> { throw new RuntimeException("Planned"); From 753d0096e793028c52f3996161f8c7775bf27045 Mon Sep 17 00:00:00 2001 From: Lars Uffmann Date: Sat, 4 Nov 2023 10:25:15 +0100 Subject: [PATCH 2/5] Allow user supplied custom ExecutionContextSerializer bean --- .../batch/BatchAutoConfiguration.java | 14 +++++++- .../batch/BatchAutoConfigurationTests.java | 34 +++++++++++++++++++ 2 files changed, 47 insertions(+), 1 deletion(-) diff --git a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/batch/BatchAutoConfiguration.java b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/batch/BatchAutoConfiguration.java index 0474d2d0afe3..2dd5b5aed0e7 100644 --- a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/batch/BatchAutoConfiguration.java +++ b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/batch/BatchAutoConfiguration.java @@ -28,7 +28,9 @@ import org.springframework.batch.core.launch.JobLauncher; import org.springframework.batch.core.launch.JobOperator; import org.springframework.batch.core.launch.support.SimpleJobOperator; +import org.springframework.batch.core.repository.ExecutionContextSerializer; import org.springframework.batch.core.repository.JobRepository; +import org.springframework.batch.core.repository.dao.DefaultExecutionContextSerializer; import org.springframework.beans.factory.ObjectProvider; import org.springframework.boot.ExitCodeGenerator; import org.springframework.boot.autoconfigure.AutoConfiguration; @@ -120,13 +122,18 @@ static class SpringBootBatchConfiguration extends DefaultBatchConfiguration { private final List batchConversionServiceCustomizers; + private final ExecutionContextSerializer executionContextSerializer; + SpringBootBatchConfiguration(DataSource dataSource, @BatchDataSource ObjectProvider batchDataSource, PlatformTransactionManager transactionManager, BatchProperties properties, - ObjectProvider batchConversionServiceCustomizers) { + ObjectProvider batchConversionServiceCustomizers, + ObjectProvider executionContextSerializer) { this.dataSource = batchDataSource.getIfAvailable(() -> dataSource); this.transactionManager = transactionManager; this.properties = properties; this.batchConversionServiceCustomizers = batchConversionServiceCustomizers.orderedStream().toList(); + this.executionContextSerializer = executionContextSerializer + .getIfAvailable(DefaultExecutionContextSerializer::new); } @Override @@ -160,6 +167,11 @@ protected ConfigurableConversionService getConversionService() { return conversionService; } + @Override + protected ExecutionContextSerializer getExecutionContextSerializer() { + return this.executionContextSerializer; + } + } @Configuration(proxyBeanMethods = false) diff --git a/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/batch/BatchAutoConfigurationTests.java b/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/batch/BatchAutoConfigurationTests.java index b57be44564d2..3e5fa2925bd0 100644 --- a/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/batch/BatchAutoConfigurationTests.java +++ b/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/batch/BatchAutoConfigurationTests.java @@ -42,7 +42,10 @@ import org.springframework.batch.core.explore.JobExplorer; import org.springframework.batch.core.job.AbstractJob; import org.springframework.batch.core.launch.JobLauncher; +import org.springframework.batch.core.repository.ExecutionContextSerializer; import org.springframework.batch.core.repository.JobRepository; +import org.springframework.batch.core.repository.dao.DefaultExecutionContextSerializer; +import org.springframework.batch.core.repository.dao.Jackson2ExecutionContextStringSerializer; import org.springframework.beans.BeansException; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.config.BeanPostProcessor; @@ -458,6 +461,27 @@ void whenTheUserDefinesAJobNameThatDoesNotExistWithRegisteredJobFailsFast() { .withMessage("No job found with name 'three'"); } + @Test + void customExecutionContextSerializerIsUsed() { + this.contextRunner.withUserConfiguration(TestConfiguration.class, EmbeddedDataSourceConfiguration.class) + .withUserConfiguration(CustomExecutionContextConfiguration.class) + .run((context) -> { + assertThat(context).hasSingleBean(Jackson2ExecutionContextStringSerializer.class); + assertThat(context.getBean(SpringBootBatchConfiguration.class).getExecutionContextSerializer()) + .isInstanceOf(Jackson2ExecutionContextStringSerializer.class); + }); + } + + @Test + void defaultExecutionContextSerializerIsUsed() { + this.contextRunner.withUserConfiguration(TestConfiguration.class, EmbeddedDataSourceConfiguration.class) + .run((context) -> { + assertThat(context).doesNotHaveBean(ExecutionContextSerializer.class); + assertThat(context.getBean(SpringBootBatchConfiguration.class).getExecutionContextSerializer()) + .isInstanceOf(DefaultExecutionContextSerializer.class); + }); + } + private JobLauncherApplicationRunner createInstance(String... registeredJobNames) { JobLauncherApplicationRunner runner = new JobLauncherApplicationRunner(mock(JobLauncher.class), mock(JobExplorer.class), mock(JobRepository.class)); @@ -773,4 +797,14 @@ BatchConversionServiceCustomizer anotherBatchConversionServiceCustomizer() { } + @Configuration(proxyBeanMethods = false) + static class CustomExecutionContextConfiguration { + + @Bean + ExecutionContextSerializer executionContextSerializer() { + return new Jackson2ExecutionContextStringSerializer(); + } + + } + } From f2b8f6c89bc34ad2792762677a485329295bda19 Mon Sep 17 00:00:00 2001 From: Lars Uffmann Date: Sat, 4 Nov 2023 10:29:28 +0100 Subject: [PATCH 3/5] Remove unnecessary JobOperator bean definition --- .../batch/BatchAutoConfiguration.java | 18 ------------------ .../batch/BatchAutoConfigurationTests.java | 4 ++++ 2 files changed, 4 insertions(+), 18 deletions(-) diff --git a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/batch/BatchAutoConfiguration.java b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/batch/BatchAutoConfiguration.java index 2dd5b5aed0e7..a0533288b305 100644 --- a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/batch/BatchAutoConfiguration.java +++ b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/batch/BatchAutoConfiguration.java @@ -20,14 +20,10 @@ import javax.sql.DataSource; -import org.springframework.batch.core.configuration.ListableJobLocator; import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing; import org.springframework.batch.core.configuration.support.DefaultBatchConfiguration; -import org.springframework.batch.core.converter.JobParametersConverter; import org.springframework.batch.core.explore.JobExplorer; import org.springframework.batch.core.launch.JobLauncher; -import org.springframework.batch.core.launch.JobOperator; -import org.springframework.batch.core.launch.support.SimpleJobOperator; import org.springframework.batch.core.repository.ExecutionContextSerializer; import org.springframework.batch.core.repository.JobRepository; import org.springframework.batch.core.repository.dao.DefaultExecutionContextSerializer; @@ -97,20 +93,6 @@ public JobExecutionExitCodeGenerator jobExecutionExitCodeGenerator() { return new JobExecutionExitCodeGenerator(); } - @Bean - @ConditionalOnMissingBean(JobOperator.class) - public SimpleJobOperator jobOperator(ObjectProvider jobParametersConverter, - JobExplorer jobExplorer, JobLauncher jobLauncher, ListableJobLocator jobRegistry, - JobRepository jobRepository) { - SimpleJobOperator factory = new SimpleJobOperator(); - factory.setJobExplorer(jobExplorer); - factory.setJobLauncher(jobLauncher); - factory.setJobRegistry(jobRegistry); - factory.setJobRepository(jobRepository); - jobParametersConverter.ifAvailable(factory::setJobParametersConverter); - return factory; - } - @Configuration(proxyBeanMethods = false) static class SpringBootBatchConfiguration extends DefaultBatchConfiguration { diff --git a/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/batch/BatchAutoConfigurationTests.java b/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/batch/BatchAutoConfigurationTests.java index 3e5fa2925bd0..f0dfad21c0d7 100644 --- a/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/batch/BatchAutoConfigurationTests.java +++ b/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/batch/BatchAutoConfigurationTests.java @@ -42,6 +42,7 @@ import org.springframework.batch.core.explore.JobExplorer; import org.springframework.batch.core.job.AbstractJob; import org.springframework.batch.core.launch.JobLauncher; +import org.springframework.batch.core.launch.JobOperator; import org.springframework.batch.core.repository.ExecutionContextSerializer; import org.springframework.batch.core.repository.JobRepository; import org.springframework.batch.core.repository.dao.DefaultExecutionContextSerializer; @@ -113,8 +114,11 @@ void testDefaultContext() { this.contextRunner.withInitializer(ConditionEvaluationReportLoggingListener.forLogLevel(LogLevel.INFO)) .withUserConfiguration(TestConfiguration.class, EmbeddedDataSourceConfiguration.class) .run((context) -> { + assertThat(context).hasSingleBean(JobRepository.class); assertThat(context).hasSingleBean(JobLauncher.class); assertThat(context).hasSingleBean(JobExplorer.class); + assertThat(context).hasSingleBean(JobRegistry.class); + assertThat(context).hasSingleBean(JobOperator.class); assertThat(context.getBean(BatchProperties.class).getJdbc().getInitializeSchema()) .isEqualTo(DatabaseInitializationMode.EMBEDDED); assertThat(new JdbcTemplate(context.getBean(DataSource.class)) From e8290e48f3acc419547fe1dc4722cca05e246fcd Mon Sep 17 00:00:00 2001 From: Lars Uffmann Date: Mon, 6 Nov 2023 11:23:17 +0100 Subject: [PATCH 4/5] Restore job parameter restart behavior --- .../batch/JobLauncherApplicationRunner.java | 4 +- .../JobLauncherApplicationRunnerTests.java | 42 ------------------- 2 files changed, 3 insertions(+), 43 deletions(-) diff --git a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/batch/JobLauncherApplicationRunner.java b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/batch/JobLauncherApplicationRunner.java index 6bd8fe96000a..a343346eb3e6 100644 --- a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/batch/JobLauncherApplicationRunner.java +++ b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/batch/JobLauncherApplicationRunner.java @@ -229,7 +229,9 @@ private JobParameters getNextJobParameters(Job job, JobParameters jobParameters) private JobParameters getNextJobParametersForExisting(Job job, JobParameters jobParameters) { JobExecution lastExecution = this.jobRepository.getLastJobExecution(job.getName(), jobParameters); if (isStoppedOrFailed(lastExecution) && job.isRestartable()) { - return merge(lastExecution.getJobParameters(), jobParameters); + JobParameters previousIdentifyingParameters = new JobParameters( + lastExecution.getJobParameters().getIdentifyingParameters()); + return merge(previousIdentifyingParameters, jobParameters); } return jobParameters; } diff --git a/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/batch/JobLauncherApplicationRunnerTests.java b/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/batch/JobLauncherApplicationRunnerTests.java index 586c77c768e5..ca6715d0b582 100644 --- a/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/batch/JobLauncherApplicationRunnerTests.java +++ b/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/batch/JobLauncherApplicationRunnerTests.java @@ -172,48 +172,6 @@ void retryFailedExecutionWithNonIdentifyingParameters() { }); } - @Test - void retryFailedExecutionWithNonIdentifyingParameters_previousParamsAreMerged() { - this.contextRunner.run((context) -> { - PlatformTransactionManager transactionManager = context.getBean(PlatformTransactionManager.class); - JobLauncherApplicationRunnerContext jobLauncherContext = new JobLauncherApplicationRunnerContext(context); - Job job = jobLauncherContext.jobBuilder() - .start(jobLauncherContext.stepBuilder().tasklet(throwingTasklet(), transactionManager).build()) - .incrementer(new RunIdIncrementer()) - .build(); - JobParameters jobParameters = new JobParametersBuilder().addLong("id", 1L, false) - .addLong("foo", 2L, false) - .toJobParameters(); - jobLauncherContext.runner.execute(job, jobParameters); - assertThat(jobLauncherContext.jobInstances()).hasSize(1); - - // try to re-run a failed execution with non identifying parameters - // identifying parameter 'run.id' is used to find the failed execution. - // non-identifying parameter 'id' is copied form the previous execution. - // non-identifying parameter 'foo' is replaced with a new value. - jobLauncherContext.runner.execute(job, - new JobParametersBuilder().addLong("run.id", 1L).addLong("foo", 3L, false).toJobParameters()); - - assertThat(jobLauncherContext.jobInstances()).hasSize(1); - - JobExecution initialExecution = jobLauncherContext.jobExplorer.getJobExecution(1L); - assertThat(initialExecution).isNotNull(); - JobParameters initialParams = initialExecution.getJobParameters(); - assertThat(initialParams.getParameters()).hasSize(3); - assertThat(initialParams.getLong("id")).isEqualTo(1L); - assertThat(initialParams.getLong("foo")).isEqualTo(2L); - assertThat(initialParams.getLong("run.id")).isEqualTo(1L); - - JobExecution retryExecution = jobLauncherContext.jobExplorer.getJobExecution(2L); - assertThat(retryExecution).isNotNull(); - JobParameters retryParams = retryExecution.getJobParameters(); - assertThat(retryParams.getParameters()).hasSize(3); - assertThat(retryParams.getLong("id")).isEqualTo(1L); - assertThat(retryParams.getLong("foo")).isEqualTo(3L); - assertThat(retryParams.getLong("run.id")).isEqualTo(1L); - }); - } - private Tasklet throwingTasklet() { return (contribution, chunkContext) -> { throw new RuntimeException("Planned"); From c0b50ddc07e7987ce18f7e204d1f9b131956e996 Mon Sep 17 00:00:00 2001 From: Lars Uffmann Date: Mon, 6 Nov 2023 13:35:40 +0100 Subject: [PATCH 5/5] Add restarting a stopped or failed Job to batch howto. --- .../spring-boot-docs/src/docs/asciidoc/howto/batch.adoc | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/spring-boot-project/spring-boot-docs/src/docs/asciidoc/howto/batch.adoc b/spring-boot-project/spring-boot-docs/src/docs/asciidoc/howto/batch.adoc index 6edbcd2e2adf..7006d994a3cd 100644 --- a/spring-boot-project/spring-boot-docs/src/docs/asciidoc/howto/batch.adoc +++ b/spring-boot-project/spring-boot-docs/src/docs/asciidoc/howto/batch.adoc @@ -53,7 +53,11 @@ Consider the following command: This provides only one argument to the batch job: `someParameter=someValue`. +=== Restarting a stopped or failed Job +In order to restart a failed `Job`, all parameters (identifying and non-identifying) have to be re-specified on the command line. Non-identifying parameters are *not* copied from the previous execution. This allows them to be modified or removed. + +NOTE: When using a custom `JobParametersIncrementer`: Be prepared to gather all parameters managed by the incrementer in order to restart a failed execution. [[howto.batch.storing-job-repository]] === Storing the Job Repository