spring-batch
spring-batch copied to clipboard
Concurrent execution of FlowJob may cause FlowExecutionException
Bug description
When flow is made by FlowBuilder
, transition is not initialized. It may cause FlowExecutionException
when multiple thread runs the same job object with different jobExecutions.
[pool-1-thread-11] ERROR org.springframework.batch.core.job.AbstractJob - Encountered fatal error executing job
org.springframework.batch.core.JobExecutionException: Flow execution ended unexpectedly
at org.springframework.batch.core.job.flow.FlowJob.doExecute(FlowJob.java:143)
at org.springframework.batch.core.job.AbstractJob.execute(AbstractJob.java:332)
at org.springframework.batch.core.job.builder.FlowJobBuilderTests.lambda$testConcurrentDecision$1(FlowJobBuilderTests.java:260)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
at java.base/java.lang.Thread.run(Thread.java:833)
Caused by: org.springframework.batch.core.job.flow.FlowExecutionException: Ended flow=flow at state=flow.flow0 with exception
at org.springframework.batch.core.job.flow.support.SimpleFlow.resume(SimpleFlow.java:177)
at org.springframework.batch.core.job.flow.support.SimpleFlow.start(SimpleFlow.java:143)
at org.springframework.batch.core.job.flow.FlowJob.doExecute(FlowJob.java:137)
... 6 more
Caused by: java.lang.IllegalArgumentException: Missing state for [StateTransition: [state=subflow1.step0, pattern=COMPLETED, next=subflow1.step1]]
at org.springframework.batch.core.job.flow.support.SimpleFlow.initializeTransitions(SimpleFlow.java:303)
at org.springframework.batch.core.job.flow.support.SimpleFlow.initializeTransitionsIfNotInitialized(SimpleFlow.java:268)
at org.springframework.batch.core.job.flow.support.SimpleFlow.start(SimpleFlow.java:139)
at org.springframework.batch.core.job.flow.support.state.FlowState.handle(FlowState.java:56)
at org.springframework.batch.core.job.flow.support.SimpleFlow.resume(SimpleFlow.java:168)
Environment
v4.3.5, jdk17
Steps to reproduce
I've made some test code to reproduce it. You can reproduce it by running a following code within FlowJobBuilderTests.
@Test
@RepeatedTest(100) // since it's rarely occured.
public void testConcurrentDecision() throws Exception {
EmbeddedDatabase embeddedDatabase = new EmbeddedDatabaseBuilder()
.addScript("/org/springframework/batch/core/schema-drop-hsqldb.sql")
.addScript("/org/springframework/batch/core/schema-hsqldb.sql")
.build();
JobRepositoryFactoryBean factory = new JobRepositoryFactoryBean();
factory.setDataSource(embeddedDatabase);
factory.setTransactionManager(new DataSourceTransactionManager(embeddedDatabase));
factory.afterPropertiesSet();
jobRepository = factory.getObject();
SimpleFlow flow1 = new FlowBuilder<SimpleFlow>("subflow1").start(step1).next(step2).end();
FlowBuilder<FlowJobBuilder> builder = new JobBuilder("flow").repository(jobRepository).start(flow1)
.next(step3);
Job job = builder.build().build();
int nThreads = Runtime.getRuntime().availableProcessors();
CyclicBarrier barrier = new CyclicBarrier(nThreads);
ExecutorService pool = Executors.newFixedThreadPool(nThreads);
List<Future<JobExecution>> futures = LongStream.range(0, nThreads)
.boxed()
.map(i -> {
try {
JobParameters jobParameters = new JobParametersBuilder()
.addLong("count", i)
.toJobParameters();
return jobRepository.createJobExecution("flow", jobParameters);
} catch (Exception e) {
throw new IllegalStateException(e);
}
}
).map(jobExecution -> pool.submit(() -> {
barrier.await();
job.execute(jobExecution);
return jobExecution;
}))
.toList();
List<JobExecution> jobExecutions = futures.stream()
.map(future -> {
try {
return future.get();
} catch (Exception e) {
throw new IllegalStateException(e);
}
})
.collect(Collectors.toList());
for (JobExecution jobExecution : jobExecutions) {
assertEquals(BatchStatus.COMPLETED, jobExecution.getStatus());
}
}
Expected behavior
No exception.
Minimal Complete Reproducible example
See Steps to reproduce.
As a workround you have to make sure #afterPropertiesSet() is called during bean creation, e.g. by using a separate bean definition:
@Bean
public Flow partitionFlow() {
return new FlowBuilder<Flow>("flow")
.start(...)
.build();
}
Manually calling #afterPropertiesSet() on the built flow is another possibility.
@svenmeier yes i'm calling afterPropertiesSet for all flow..