spring-batch icon indicating copy to clipboard operation
spring-batch copied to clipboard

Concurrent execution of FlowJob may cause FlowExecutionException

Open acktsap opened this issue 2 years ago • 2 comments

Bug description

When flow is made by FlowBuilder, transition is not initialized. It may cause FlowExecutionException when multiple thread runs the same job object with different jobExecutions.

[pool-1-thread-11] ERROR org.springframework.batch.core.job.AbstractJob - Encountered fatal error executing job
org.springframework.batch.core.JobExecutionException: Flow execution ended unexpectedly
	at org.springframework.batch.core.job.flow.FlowJob.doExecute(FlowJob.java:143)
	at org.springframework.batch.core.job.AbstractJob.execute(AbstractJob.java:332)
	at org.springframework.batch.core.job.builder.FlowJobBuilderTests.lambda$testConcurrentDecision$1(FlowJobBuilderTests.java:260)
	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
	at java.base/java.lang.Thread.run(Thread.java:833)
Caused by: org.springframework.batch.core.job.flow.FlowExecutionException: Ended flow=flow at state=flow.flow0 with exception
	at org.springframework.batch.core.job.flow.support.SimpleFlow.resume(SimpleFlow.java:177)
	at org.springframework.batch.core.job.flow.support.SimpleFlow.start(SimpleFlow.java:143)
	at org.springframework.batch.core.job.flow.FlowJob.doExecute(FlowJob.java:137)
	... 6 more
Caused by: java.lang.IllegalArgumentException: Missing state for [StateTransition: [state=subflow1.step0, pattern=COMPLETED, next=subflow1.step1]]
	at org.springframework.batch.core.job.flow.support.SimpleFlow.initializeTransitions(SimpleFlow.java:303)
	at org.springframework.batch.core.job.flow.support.SimpleFlow.initializeTransitionsIfNotInitialized(SimpleFlow.java:268)
	at org.springframework.batch.core.job.flow.support.SimpleFlow.start(SimpleFlow.java:139)
	at org.springframework.batch.core.job.flow.support.state.FlowState.handle(FlowState.java:56)
	at org.springframework.batch.core.job.flow.support.SimpleFlow.resume(SimpleFlow.java:168)

Environment

v4.3.5, jdk17

Steps to reproduce

I've made some test code to reproduce it. You can reproduce it by running a following code within FlowJobBuilderTests.

	@Test
	@RepeatedTest(100) // since it's rarely occured.
	public void testConcurrentDecision() throws Exception {
		EmbeddedDatabase embeddedDatabase = new EmbeddedDatabaseBuilder()
				.addScript("/org/springframework/batch/core/schema-drop-hsqldb.sql")
				.addScript("/org/springframework/batch/core/schema-hsqldb.sql")
				.build();
		JobRepositoryFactoryBean factory = new JobRepositoryFactoryBean();
		factory.setDataSource(embeddedDatabase);
		factory.setTransactionManager(new DataSourceTransactionManager(embeddedDatabase));
		factory.afterPropertiesSet();
		jobRepository = factory.getObject();

		SimpleFlow flow1 = new FlowBuilder<SimpleFlow>("subflow1").start(step1).next(step2).end();
		FlowBuilder<FlowJobBuilder> builder = new JobBuilder("flow").repository(jobRepository).start(flow1)
				.next(step3);
		Job job = builder.build().build();

		int nThreads = Runtime.getRuntime().availableProcessors();
		CyclicBarrier barrier = new CyclicBarrier(nThreads);
		ExecutorService pool = Executors.newFixedThreadPool(nThreads);
		List<Future<JobExecution>> futures = LongStream.range(0, nThreads)
				.boxed()
				.map(i -> {
							try {
								JobParameters jobParameters = new JobParametersBuilder()
										.addLong("count", i)
										.toJobParameters();
								return jobRepository.createJobExecution("flow", jobParameters);
							} catch (Exception e) {
								throw new IllegalStateException(e);
							}
						}
				).map(jobExecution -> pool.submit(() -> {
					barrier.await();
					job.execute(jobExecution);
					return jobExecution;
				}))
				.toList();

		List<JobExecution> jobExecutions = futures.stream()
				.map(future -> {
					try {
						return future.get();
					} catch (Exception e) {
						throw new IllegalStateException(e);
					}
				})
				.collect(Collectors.toList());
		for (JobExecution jobExecution : jobExecutions) {
			assertEquals(BatchStatus.COMPLETED, jobExecution.getStatus());
		}
	}

Expected behavior

No exception.

Minimal Complete Reproducible example

See Steps to reproduce.

acktsap avatar Apr 11 '22 14:04 acktsap

As a workround you have to make sure #afterPropertiesSet() is called during bean creation, e.g. by using a separate bean definition:

@Bean
public Flow partitionFlow() {
    return new FlowBuilder<Flow>("flow")
            .start(...)
            .build();
}

Manually calling #afterPropertiesSet() on the built flow is another possibility.

svenmeier avatar Jul 11 '22 11:07 svenmeier

@svenmeier yes i'm calling afterPropertiesSet for all flow..

acktsap avatar Jul 25 '22 04:07 acktsap