Skip to content

Commit 7bb6df4

Browse files
committed
Merge pull request #14933 from benas
* pr/14933: Polish "Fix Spring Batch job restart parameters handling" Fix Spring Batch job restart parameters handling
2 parents d1ce315 + 62b9268 commit 7bb6df4

File tree

3 files changed

+175
-23
lines changed

3 files changed

+175
-23
lines changed

spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/batch/BatchAutoConfiguration.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@
5656
* @author Dave Syer
5757
* @author Eddú Meléndez
5858
* @author Kazuki Shimizu
59+
* @author Mahmoud Ben Hassine
5960
*/
6061
@Configuration
6162
@ConditionalOnClass({ JobLauncher.class, DataSource.class, JdbcOperations.class })
@@ -88,9 +89,10 @@ public BatchDataSourceInitializer batchDataSourceInitializer(DataSource dataSour
8889
@ConditionalOnMissingBean
8990
@ConditionalOnProperty(prefix = "spring.batch.job", name = "enabled", havingValue = "true", matchIfMissing = true)
9091
public JobLauncherCommandLineRunner jobLauncherCommandLineRunner(
91-
JobLauncher jobLauncher, JobExplorer jobExplorer) {
92+
JobLauncher jobLauncher, JobExplorer jobExplorer,
93+
JobRepository jobRepository) {
9294
JobLauncherCommandLineRunner runner = new JobLauncherCommandLineRunner(
93-
jobLauncher, jobExplorer);
95+
jobLauncher, jobExplorer, jobRepository);
9496
String jobNames = this.properties.getJob().getNames();
9597
if (StringUtils.hasText(jobNames)) {
9698
runner.setJobNames(jobNames);

spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/batch/JobLauncherCommandLineRunner.java

Lines changed: 103 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2012-2017 the original author or authors.
2+
* Copyright 2012-2018 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -19,14 +19,19 @@
1919
import java.util.Arrays;
2020
import java.util.Collection;
2121
import java.util.Collections;
22+
import java.util.HashMap;
23+
import java.util.LinkedHashMap;
24+
import java.util.Map;
2225
import java.util.Properties;
2326

2427
import org.apache.commons.logging.Log;
2528
import org.apache.commons.logging.LogFactory;
2629

30+
import org.springframework.batch.core.BatchStatus;
2731
import org.springframework.batch.core.Job;
2832
import org.springframework.batch.core.JobExecution;
2933
import org.springframework.batch.core.JobExecutionException;
34+
import org.springframework.batch.core.JobParameter;
3035
import org.springframework.batch.core.JobParameters;
3136
import org.springframework.batch.core.JobParametersBuilder;
3237
import org.springframework.batch.core.JobParametersInvalidException;
@@ -39,12 +44,14 @@
3944
import org.springframework.batch.core.launch.NoSuchJobException;
4045
import org.springframework.batch.core.repository.JobExecutionAlreadyRunningException;
4146
import org.springframework.batch.core.repository.JobInstanceAlreadyCompleteException;
47+
import org.springframework.batch.core.repository.JobRepository;
4248
import org.springframework.batch.core.repository.JobRestartException;
4349
import org.springframework.beans.factory.annotation.Autowired;
4450
import org.springframework.boot.CommandLineRunner;
4551
import org.springframework.context.ApplicationEventPublisher;
4652
import org.springframework.context.ApplicationEventPublisherAware;
4753
import org.springframework.core.Ordered;
54+
import org.springframework.util.Assert;
4855
import org.springframework.util.PatternMatchUtils;
4956
import org.springframework.util.StringUtils;
5057

@@ -55,6 +62,7 @@
5562
*
5663
* @author Dave Syer
5764
* @author Jean-Pierre Bergamin
65+
* @author Mahmoud Ben Hassine
5866
*/
5967
public class JobLauncherCommandLineRunner
6068
implements CommandLineRunner, Ordered, ApplicationEventPublisherAware {
@@ -69,11 +77,13 @@ public class JobLauncherCommandLineRunner
6977

7078
private JobParametersConverter converter = new DefaultJobParametersConverter();
7179

72-
private JobLauncher jobLauncher;
80+
private final JobLauncher jobLauncher;
7381

74-
private JobRegistry jobRegistry;
82+
private final JobExplorer jobExplorer;
83+
84+
private final JobRepository jobRepository;
7585

76-
private JobExplorer jobExplorer;
86+
private JobRegistry jobRegistry;
7787

7888
private String jobNames;
7989

@@ -83,10 +93,38 @@ public class JobLauncherCommandLineRunner
8393

8494
private ApplicationEventPublisher publisher;
8595

96+
/**
97+
* Create a new {@link JobLauncherCommandLineRunner}.
98+
* @param jobLauncher to launch jobs
99+
* @param jobExplorer to check the job repository for previous executions
100+
* @deprecated since 2.0.7 in favor of
101+
* {@link #JobLauncherCommandLineRunner(JobLauncher, JobExplorer, JobRepository)}. A
102+
* job repository is required to check if a job instance exists with the given
103+
* parameters when running a job (which is not possible with the job explorer).
104+
*/
105+
@Deprecated
86106
public JobLauncherCommandLineRunner(JobLauncher jobLauncher,
87107
JobExplorer jobExplorer) {
88108
this.jobLauncher = jobLauncher;
89109
this.jobExplorer = jobExplorer;
110+
this.jobRepository = null;
111+
}
112+
113+
/**
114+
* Create a new {@link JobLauncherCommandLineRunner}.
115+
* @param jobLauncher to launch jobs
116+
* @param jobExplorer to check the job repository for previous executions
117+
* @param jobRepository to check if a job instance exists with the given parameters
118+
* when running a job
119+
*/
120+
public JobLauncherCommandLineRunner(JobLauncher jobLauncher, JobExplorer jobExplorer,
121+
JobRepository jobRepository) {
122+
Assert.notNull(jobLauncher, "JobLauncher must not be null");
123+
Assert.notNull(jobExplorer, "JobExplorer must not be null");
124+
Assert.notNull(jobRepository, "JobRepository must not be null");
125+
this.jobLauncher = jobLauncher;
126+
this.jobExplorer = jobExplorer;
127+
this.jobRepository = jobRepository;
90128
}
91129

92130
public void setOrder(int order) {
@@ -135,6 +173,20 @@ protected void launchJobFromProperties(Properties properties)
135173
executeRegisteredJobs(jobParameters);
136174
}
137175

176+
private void executeLocalJobs(JobParameters jobParameters)
177+
throws JobExecutionException {
178+
for (Job job : this.jobs) {
179+
if (StringUtils.hasText(this.jobNames)) {
180+
String[] jobsToRun = this.jobNames.split(",");
181+
if (!PatternMatchUtils.simpleMatch(jobsToRun, job.getName())) {
182+
logger.debug("Skipped job: " + job.getName());
183+
continue;
184+
}
185+
}
186+
execute(job, jobParameters);
187+
}
188+
}
189+
138190
private void executeRegisteredJobs(JobParameters jobParameters)
139191
throws JobExecutionException {
140192
if (this.jobRegistry != null && StringUtils.hasText(this.jobNames)) {
@@ -158,26 +210,59 @@ protected void execute(Job job, JobParameters jobParameters)
158210
throws JobExecutionAlreadyRunningException, JobRestartException,
159211
JobInstanceAlreadyCompleteException, JobParametersInvalidException,
160212
JobParametersNotFoundException {
161-
JobParameters nextParameters = new JobParametersBuilder(jobParameters,
162-
this.jobExplorer).getNextJobParameters(job).toJobParameters();
163-
JobExecution execution = this.jobLauncher.run(job, nextParameters);
213+
JobParameters parameters = getNextJobParameters(job, jobParameters);
214+
JobExecution execution = this.jobLauncher.run(job, parameters);
164215
if (this.publisher != null) {
165216
this.publisher.publishEvent(new JobExecutionEvent(execution));
166217
}
167218
}
168219

169-
private void executeLocalJobs(JobParameters jobParameters)
170-
throws JobExecutionException {
171-
for (Job job : this.jobs) {
172-
if (StringUtils.hasText(this.jobNames)) {
173-
String[] jobsToRun = this.jobNames.split(",");
174-
if (!PatternMatchUtils.simpleMatch(jobsToRun, job.getName())) {
175-
logger.debug("Skipped job: " + job.getName());
176-
continue;
177-
}
178-
}
179-
execute(job, jobParameters);
220+
private JobParameters getNextJobParameters(Job job, JobParameters jobParameters) {
221+
if (this.jobRepository != null
222+
&& this.jobRepository.isJobInstanceExists(job.getName(), jobParameters)) {
223+
return getNextJobParametersForExisting(job, jobParameters);
224+
}
225+
if (job.getJobParametersIncrementer() == null) {
226+
return jobParameters;
180227
}
228+
JobParameters nextParameters = new JobParametersBuilder(jobParameters,
229+
this.jobExplorer).getNextJobParameters(job).toJobParameters();
230+
return merge(nextParameters, jobParameters);
231+
}
232+
233+
private JobParameters getNextJobParametersForExisting(Job job,
234+
JobParameters jobParameters) {
235+
JobExecution lastExecution = this.jobRepository.getLastJobExecution(job.getName(),
236+
jobParameters);
237+
if (isStoppedOrFailed(lastExecution) && job.isRestartable()) {
238+
JobParameters previousIdentifyingParameters = getGetIdentifying(
239+
lastExecution.getJobParameters());
240+
return merge(previousIdentifyingParameters, jobParameters);
241+
}
242+
return jobParameters;
243+
}
244+
245+
private boolean isStoppedOrFailed(JobExecution execution) {
246+
BatchStatus status = (execution != null) ? execution.getStatus() : null;
247+
return (status == BatchStatus.STOPPED || status == BatchStatus.FAILED);
248+
}
249+
250+
private JobParameters getGetIdentifying(JobParameters parameters) {
251+
HashMap<String, JobParameter> nonIdentifying = new LinkedHashMap<>(
252+
parameters.getParameters().size());
253+
parameters.getParameters().forEach((key, value) -> {
254+
if (value.isIdentifying()) {
255+
nonIdentifying.put(key, value);
256+
}
257+
});
258+
return new JobParameters(nonIdentifying);
259+
}
260+
261+
private JobParameters merge(JobParameters parameters, JobParameters additionals) {
262+
Map<String, JobParameter> merged = new LinkedHashMap<>();
263+
merged.putAll(parameters.getParameters());
264+
merged.putAll(additionals.getParameters());
265+
return new JobParameters(merged);
181266
}
182267

183268
}

spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/batch/JobLauncherCommandLineRunnerTests.java

Lines changed: 68 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2012-2017 the original author or authors.
2+
* Copyright 2012-2018 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -21,6 +21,8 @@
2121
import org.junit.Test;
2222

2323
import org.springframework.batch.core.Job;
24+
import org.springframework.batch.core.JobExecution;
25+
import org.springframework.batch.core.JobInstance;
2426
import org.springframework.batch.core.JobParameters;
2527
import org.springframework.batch.core.JobParametersBuilder;
2628
import org.springframework.batch.core.Step;
@@ -34,6 +36,7 @@
3436
import org.springframework.batch.core.launch.support.RunIdIncrementer;
3537
import org.springframework.batch.core.launch.support.SimpleJobLauncher;
3638
import org.springframework.batch.core.repository.JobRepository;
39+
import org.springframework.batch.core.repository.JobRestartException;
3740
import org.springframework.batch.core.repository.support.MapJobRepositoryFactoryBean;
3841
import org.springframework.batch.core.step.tasklet.Tasklet;
3942
import org.springframework.batch.support.transaction.ResourcelessTransactionManager;
@@ -43,12 +46,15 @@
4346
import org.springframework.transaction.PlatformTransactionManager;
4447

4548
import static org.assertj.core.api.Assertions.assertThat;
49+
import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
50+
import static org.assertj.core.api.Assertions.fail;
4651

4752
/**
4853
* Tests for {@link JobLauncherCommandLineRunner}.
4954
*
5055
* @author Dave Syer
5156
* @author Jean-Pierre Bergamin
57+
* @author Mahmoud Ben Hassine
5258
*/
5359
public class JobLauncherCommandLineRunnerTests {
5460

@@ -80,7 +86,8 @@ public void init() {
8086
this.step = this.steps.get("step").tasklet(tasklet).build();
8187
this.job = this.jobs.get("job").start(this.step).build();
8288
this.jobExplorer = this.context.getBean(JobExplorer.class);
83-
this.runner = new JobLauncherCommandLineRunner(jobLauncher, this.jobExplorer);
89+
this.runner = new JobLauncherCommandLineRunner(jobLauncher, this.jobExplorer,
90+
jobRepository);
8491
this.context.getBean(BatchConfiguration.class).clear();
8592
}
8693

@@ -113,8 +120,25 @@ public void retryFailedExecution() throws Exception {
113120
.start(this.steps.get("step").tasklet(throwingTasklet()).build())
114121
.incrementer(new RunIdIncrementer()).build();
115122
this.runner.execute(this.job, new JobParameters());
116-
this.runner.execute(this.job, new JobParameters());
123+
this.runner.execute(this.job,
124+
new JobParametersBuilder().addLong("run.id", 1L).toJobParameters());
125+
assertThat(this.jobExplorer.getJobInstances("job", 0, 100)).hasSize(1);
126+
}
127+
128+
@Test
129+
public void runDifferentInstances() throws Exception {
130+
this.job = this.jobs.get("job")
131+
.start(this.steps.get("step").tasklet(throwingTasklet()).build()).build();
132+
// start a job instance
133+
JobParameters jobParameters = new JobParametersBuilder().addString("name", "foo")
134+
.toJobParameters();
135+
this.runner.execute(this.job, jobParameters);
117136
assertThat(this.jobExplorer.getJobInstances("job", 0, 100)).hasSize(1);
137+
// start a different job instance
138+
JobParameters otherJobParameters = new JobParametersBuilder()
139+
.addString("name", "bar").toJobParameters();
140+
this.runner.execute(this.job, otherJobParameters);
141+
assertThat(this.jobExplorer.getJobInstances("job", 0, 100)).hasSize(2);
118142
}
119143

120144
@Test
@@ -127,6 +151,12 @@ public void retryFailedExecutionOnNonRestartableJob() throws Exception {
127151
// A failed job that is not restartable does not re-use the job params of
128152
// the last execution, but creates a new job instance when running it again.
129153
assertThat(this.jobExplorer.getJobInstances("job", 0, 100)).hasSize(2);
154+
assertThatExceptionOfType(JobRestartException.class).isThrownBy(() -> {
155+
// try to re-run a failed execution
156+
this.runner.execute(this.job,
157+
new JobParametersBuilder().addLong("run.id", 1L).toJobParameters());
158+
fail("expected JobRestartException");
159+
}).withMessageContaining("JobInstance already exists and is not restartable");
130160
}
131161

132162
@Test
@@ -137,8 +167,43 @@ public void retryFailedExecutionWithNonIdentifyingParameters() throws Exception
137167
JobParameters jobParameters = new JobParametersBuilder().addLong("id", 1L, false)
138168
.addLong("foo", 2L, false).toJobParameters();
139169
this.runner.execute(this.job, jobParameters);
170+
assertThat(this.jobExplorer.getJobInstances("job", 0, 100)).hasSize(1);
171+
// try to re-run a failed execution with non identifying parameters
172+
this.runner.execute(this.job, new JobParametersBuilder(jobParameters)
173+
.addLong("run.id", 1L).toJobParameters());
174+
assertThat(this.jobExplorer.getJobInstances("job", 0, 100)).hasSize(1);
175+
}
176+
177+
@Test
178+
public void retryFailedExecutionWithDifferentNonIdentifyingParametersFromPreviousExecution()
179+
throws Exception {
180+
this.job = this.jobs.get("job")
181+
.start(this.steps.get("step").tasklet(throwingTasklet()).build())
182+
.incrementer(new RunIdIncrementer()).build();
183+
JobParameters jobParameters = new JobParametersBuilder().addLong("id", 1L, false)
184+
.addLong("foo", 2L, false).toJobParameters();
140185
this.runner.execute(this.job, jobParameters);
141186
assertThat(this.jobExplorer.getJobInstances("job", 0, 100)).hasSize(1);
187+
// try to re-run a failed execution with non identifying parameters
188+
this.runner.execute(this.job, new JobParametersBuilder().addLong("run.id", 1L)
189+
.addLong("id", 2L, false).addLong("foo", 3L, false).toJobParameters());
190+
assertThat(this.jobExplorer.getJobInstances("job", 0, 100)).hasSize(1);
191+
JobInstance jobInstance = this.jobExplorer.getJobInstance(0L);
192+
assertThat(this.jobExplorer.getJobExecutions(jobInstance)).hasSize(2);
193+
// first execution
194+
JobExecution firstJobExecution = this.jobExplorer.getJobExecution(0L);
195+
JobParameters parameters = firstJobExecution.getJobParameters();
196+
assertThat(parameters.getLong("run.id")).isEqualTo(1L);
197+
assertThat(parameters.getLong("id")).isEqualTo(1L);
198+
assertThat(parameters.getLong("foo")).isEqualTo(2L);
199+
// second execution
200+
JobExecution secondJobExecution = this.jobExplorer.getJobExecution(1L);
201+
parameters = secondJobExecution.getJobParameters();
202+
// identifying parameters should be the same as previous execution
203+
assertThat(parameters.getLong("run.id")).isEqualTo(1L);
204+
// non-identifying parameters should be the newly specified ones
205+
assertThat(parameters.getLong("id")).isEqualTo(2L);
206+
assertThat(parameters.getLong("foo")).isEqualTo(3L);
142207
}
143208

144209
private Tasklet throwingTasklet() {

0 commit comments

Comments
 (0)