spring-batch
spring-batch copied to clipboard
Concurrent execution of FlowJob may cause FlowExecutionException
Bug description
When flow is made by FlowBuilder, transition is not initialized. It may cause FlowExecutionException when multiple thread runs the same job object with different jobExecutions.
[pool-1-thread-11] ERROR org.springframework.batch.core.job.AbstractJob - Encountered fatal error executing job
org.springframework.batch.core.JobExecutionException: Flow execution ended unexpectedly
at org.springframework.batch.core.job.flow.FlowJob.doExecute(FlowJob.java:143)
at org.springframework.batch.core.job.AbstractJob.execute(AbstractJob.java:332)
at org.springframework.batch.core.job.builder.FlowJobBuilderTests.lambda$testConcurrentDecision$1(FlowJobBuilderTests.java:260)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
at java.base/java.lang.Thread.run(Thread.java:833)
Caused by: org.springframework.batch.core.job.flow.FlowExecutionException: Ended flow=flow at state=flow.flow0 with exception
at org.springframework.batch.core.job.flow.support.SimpleFlow.resume(SimpleFlow.java:177)
at org.springframework.batch.core.job.flow.support.SimpleFlow.start(SimpleFlow.java:143)
at org.springframework.batch.core.job.flow.FlowJob.doExecute(FlowJob.java:137)
... 6 more
Caused by: java.lang.IllegalArgumentException: Missing state for [StateTransition: [state=subflow1.step0, pattern=COMPLETED, next=subflow1.step1]]
at org.springframework.batch.core.job.flow.support.SimpleFlow.initializeTransitions(SimpleFlow.java:303)
at org.springframework.batch.core.job.flow.support.SimpleFlow.initializeTransitionsIfNotInitialized(SimpleFlow.java:268)
at org.springframework.batch.core.job.flow.support.SimpleFlow.start(SimpleFlow.java:139)
at org.springframework.batch.core.job.flow.support.state.FlowState.handle(FlowState.java:56)
at org.springframework.batch.core.job.flow.support.SimpleFlow.resume(SimpleFlow.java:168)
Environment
v4.3.5, jdk17
Steps to reproduce
I've made some test code to reproduce it. You can reproduce it by running a following code within FlowJobBuilderTests.
@Test
@RepeatedTest(100) // since it's rarely occured.
public void testConcurrentDecision() throws Exception {
EmbeddedDatabase embeddedDatabase = new EmbeddedDatabaseBuilder()
.addScript("/org/springframework/batch/core/schema-drop-hsqldb.sql")
.addScript("/org/springframework/batch/core/schema-hsqldb.sql")
.build();
JobRepositoryFactoryBean factory = new JobRepositoryFactoryBean();
factory.setDataSource(embeddedDatabase);
factory.setTransactionManager(new DataSourceTransactionManager(embeddedDatabase));
factory.afterPropertiesSet();
jobRepository = factory.getObject();
SimpleFlow flow1 = new FlowBuilder<SimpleFlow>("subflow1").start(step1).next(step2).end();
FlowBuilder<FlowJobBuilder> builder = new JobBuilder("flow").repository(jobRepository).start(flow1)
.next(step3);
Job job = builder.build().build();
int nThreads = Runtime.getRuntime().availableProcessors();
CyclicBarrier barrier = new CyclicBarrier(nThreads);
ExecutorService pool = Executors.newFixedThreadPool(nThreads);
List<Future<JobExecution>> futures = LongStream.range(0, nThreads)
.boxed()
.map(i -> {
try {
JobParameters jobParameters = new JobParametersBuilder()
.addLong("count", i)
.toJobParameters();
return jobRepository.createJobExecution("flow", jobParameters);
} catch (Exception e) {
throw new IllegalStateException(e);
}
}
).map(jobExecution -> pool.submit(() -> {
barrier.await();
job.execute(jobExecution);
return jobExecution;
}))
.toList();
List<JobExecution> jobExecutions = futures.stream()
.map(future -> {
try {
return future.get();
} catch (Exception e) {
throw new IllegalStateException(e);
}
})
.collect(Collectors.toList());
for (JobExecution jobExecution : jobExecutions) {
assertEquals(BatchStatus.COMPLETED, jobExecution.getStatus());
}
}
Expected behavior
No exception.
Minimal Complete Reproducible example
See Steps to reproduce.
As a workround you have to make sure #afterPropertiesSet() is called during bean creation, e.g. by using a separate bean definition:
@Bean
public Flow partitionFlow() {
return new FlowBuilder<Flow>("flow")
.start(...)
.build();
}
Manually calling #afterPropertiesSet() on the built flow is another possibility.
@svenmeier yes i'm calling afterPropertiesSet for all flow..