spring-batch
spring-batch copied to clipboard
Improve documentation with regard to chunk scanning
Bug description
A simple faultTolerant step configuration using NeverRetryPolicy and AlwaysSkipItemSkipPolicy always retry the items when any exception occurs. Unless I am missing something, I would expect to do not see any retries at all.
Environment Spring Batch 4.3.3 jdk 15.0.2 Postgres 12.4
Steps to reproduce
- Configure a faultTolerent step with policies:
NeverRetryPolicyandAlwaysSkipItemSkipPolicy - Configure the writer to throw a RuntimeException
- Launch a job with that step
- Check number of times that the writer is called
Expected behavior With 5 items to be processed, I would expect that the service would be called only 5 times.
Minimal Complete Reproducible example
Job Configuration:
@Configuration
@EnableBatchProcessing
@RequiredArgsConstructor
@Slf4j
public class ChunkSizeBatchConfig {
private final JobBuilderFactory jobBuilderFactory;
private final StepBuilderFactory stepBuilderFactory;
private final MockedService mockedService;
@Bean
Job chunkSizeJob() {
var itemReader = new ListItemReader<>(List.of(1, 2, 3, 4, 5));
var itemWriter = new ItemWriter<Integer>() {
@Override
public void write(@Nonnull List<? extends Integer> items) {
logger.info("chunkSizeJob processing item \"{}\"", items);
items.forEach(mockedService::doSomething);
}
};
var step = stepBuilderFactory.get("testStep")
.<Integer, Integer>chunk(1)
.reader(itemReader)
.writer(itemWriter)
.faultTolerant()
.skipPolicy(new AlwaysSkipItemSkipPolicy())
.retryPolicy(new NeverRetryPolicy())
.build();
return jobBuilderFactory.get("testJob")
.incrementer(new RunIdIncrementer())
.flow(step)
.end()
.build();
}
interface MockedService {
void doSomething(int obj);
}
}
Test class
@SpringBatchTest
@DataJpaTest(excludeAutoConfiguration = {TestDatabaseAutoConfiguration.class})
@EnableAutoConfiguration
@EntityScan("com.possiblefinance.ffp.adapter.out.persistence")
// overrides @SpringBatchTest @TestExecutionListeners
@TestExecutionListeners(
listeners = {DBRiderTestExecutionListener.class, StepScopeTestExecutionListener.class, JobScopeTestExecutionListener.class},
mergeMode = TestExecutionListeners.MergeMode.MERGE_WITH_DEFAULTS
)
// Spring Batch needs to open it's own transaction for the test
@Transactional(propagation = Propagation.SUPPORTS)
@ContextConfiguration(classes = {ChunkSizeBatchConfig.class})
class ChunkSizeBatchConfigIT {
@Autowired
protected JobLauncherTestUtils jobLauncherTestUtils;
@MockBean
private ChunkSizeBatchConfig.MockedService mockedService;
@Test
@SneakyThrows
void testJobCheckCompletedEvenWithException() {
var jobParameters = new JobParametersBuilder()
.addLong("testExecutionKey", System.currentTimeMillis())
.addString(BatchConstants.JOB_PARAMETER_PARTITIONING_KEYS, "1,2,3")
.toJobParameters();
Mockito.doThrow(new RuntimeException("Any error"))
.when(mockedService).doSomething(anyInt());
var jobExecution1 = jobLauncherTestUtils.launchJob(jobParameters);
assertThat(jobExecution1.getExitStatus().getExitCode()).isEqualTo("COMPLETED");
verify(mockedService, times(5)).doSomething(anyInt());
}
}
Test Output:
2021-06-18 16:13:04.462 INFO 11727 --- [ main] c.p.f.a.j.configs.ChunkSizeBatchConfig : chunkSizeJob processing item "[1]"
2021-06-18 16:13:04.469 INFO 11727 --- [ main] c.p.f.a.j.configs.ChunkSizeBatchConfig : chunkSizeJob processing item "[1]"
2021-06-18 16:13:04.477 INFO 11727 --- [ main] c.p.f.a.j.configs.ChunkSizeBatchConfig : chunkSizeJob processing item "[2]"
2021-06-18 16:13:04.478 INFO 11727 --- [ main] c.p.f.a.j.configs.ChunkSizeBatchConfig : chunkSizeJob processing item "[2]"
2021-06-18 16:13:04.485 INFO 11727 --- [ main] c.p.f.a.j.configs.ChunkSizeBatchConfig : chunkSizeJob processing item "[3]"
2021-06-18 16:13:04.486 INFO 11727 --- [ main] c.p.f.a.j.configs.ChunkSizeBatchConfig : chunkSizeJob processing item "[3]"
2021-06-18 16:13:04.493 INFO 11727 --- [ main] c.p.f.a.j.configs.ChunkSizeBatchConfig : chunkSizeJob processing item "[4]"
2021-06-18 16:13:04.494 INFO 11727 --- [ main] c.p.f.a.j.configs.ChunkSizeBatchConfig : chunkSizeJob processing item "[4]"
2021-06-18 16:13:04.502 INFO 11727 --- [ main] c.p.f.a.j.configs.ChunkSizeBatchConfig : chunkSizeJob processing item "[5]"
2021-06-18 16:13:04.503 INFO 11727 --- [ main] c.p.f.a.j.configs.ChunkSizeBatchConfig : chunkSizeJob processing item "[5]"
2021-06-18 16:13:04.522 INFO 11727 --- [ main] o.s.batch.core.step.AbstractStep : Step: [testStep] executed in 79ms
2021-06-18 16:13:04.539 INFO 11727 --- [ main] o.s.b.c.l.support.SimpleJobLauncher : Job: [FlowJob: [name=testJob]] completed with the following parameters: [{testExecutionKey=1624032784328, partitioningKeys=1,2,3}] and the following status: [COMPLETED] in 115ms
org.mockito.exceptions.verification.TooManyActualInvocations:
com.possiblefinance.ffp.application.jobs.configs.ChunkSizeBatchConfig$MockedService#0 bean.doSomething(
<any integer>
);
Wanted 5 times:
-> at com.possiblefinance.ffp.application.jobs.configs.ChunkSizeBatchConfigIT.testJobCheckCompletedEvenWithException(ChunkSizeBatchConfigIT.java:52)
But was 10 times:
-> at java.base/java.util.ArrayList.forEach(ArrayList.java:1511)
-> at java.base/java.util.Collections$SingletonList.forEach(Collections.java:4933)
-> at java.base/java.util.ArrayList.forEach(ArrayList.java:1511)
-> at java.base/java.util.Collections$SingletonList.forEach(Collections.java:4933)
-> at java.base/java.util.ArrayList.forEach(ArrayList.java:1511)
-> at java.base/java.util.Collections$SingletonList.forEach(Collections.java:4933)
-> at java.base/java.util.ArrayList.forEach(ArrayList.java:1511)
-> at java.base/java.util.Collections$SingletonList.forEach(Collections.java:4933)
-> at java.base/java.util.ArrayList.forEach(ArrayList.java:1511)
-> at java.base/java.util.Collections$SingletonList.forEach(Collections.java:4933)
Thank you for opening this issue. I do confirm that this could be surprising, but the behaviour you are seeing is actually chunk scanning, which is triggered when an exception is thrown from the ChunkProcessor (ie item processor or item writer) in a fault tolerant step. This is probably a documentation issue, as the Configuring Retry Logic and Configuring Skip Logic sections do not mention chunk scanning, but there is a sample that explains the expected behaviour in details.
I think what is missing is an option to opt out of chunk scanning as requested in #2071. Do you agree?
Thank you @benas. I understand the issue better now. I was able to move the logic from ItemWriter to ItemProcessor as suggested by #2071 and it is working as expected now. For me, the opt-out of chunk scanning can work but seems like some kind of workaround for something that should no happen. Sorry if I don't have all the context and implementation details, but retrying an item that was already processed just to verify each item is causing the issue, seems odd to me. Anyway, let me know if this is something I can help with. I would be happy to submit a PR if I can get some directions on how to fix this without breaking backwards compatibility.
Sorry if I don't have all the context and implementation details, but retrying an item that was already processed just to verify each item is causing the issue, seems odd to me.
This is because chunk scanning retries items one by one at the ChunkProcessor level (which involves the ItemProcessor and the ItemWriter), and that's why the documentation clearly mentions that in a fault-tolerant step, the item processor should be designed in an idempotent way or marked as non transactional with the processorNonTransactional flag (which will re-use processed items from the cache between retries).
I believe the fact that you were expecting no retry to happen when supplying a NeverRetryPolicy is because the documentation does not mention that chunk scanning will be triggered anyway and because your example uses a chunk size of 1 which made you believe there is a retry happening. Here is a quick example with a SimpleRetryPolicy of a maximum of 5 attempts:
import java.util.Arrays;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.batch.core.step.skip.AlwaysSkipItemSkipPolicy;
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.item.support.ListItemReader;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.retry.policy.SimpleRetryPolicy;
@Configuration
@EnableBatchProcessing
public class GH3946 {
@Bean
public ItemReader<Integer> itemReader() {
return new ListItemReader<>(Arrays.asList(1, 2, 3, 4, 5, 6));
}
@Bean
public ItemWriter<Integer> itemWriter() {
return items -> {
System.out.println("about to write items: " + items);
if (items.contains(3) || items.contains(5)) {
throw new Exception("expected");
}
for (Integer item : items) {
System.out.println("item = " + item);
}
};
}
@Bean
public Job job(JobBuilderFactory jobs, StepBuilderFactory steps) {
return jobs.get("job")
.start(steps.get("step")
.<Integer, Integer>chunk(3)
.reader(itemReader())
.writer(itemWriter())
.faultTolerant()
.retryPolicy(new SimpleRetryPolicy(5))
.skipPolicy(new AlwaysSkipItemSkipPolicy())
.build())
.build();
}
public static void main(String[] args) throws Exception {
ApplicationContext context = new AnnotationConfigApplicationContext(GH3946.class);
JobLauncher jobLauncher = context.getBean(JobLauncher.class);
Job job = context.getBean(Job.class);
JobParameters jobParameters = new JobParameters();
jobLauncher.run(job, jobParameters);
}
}
This prints:
about to write items: [1, 2, 3]
about to write items: [1, 2, 3]
about to write items: [1, 2, 3]
about to write items: [1, 2, 3]
about to write items: [1, 2, 3]
about to write items: [1]
item = 1
about to write items: [2]
item = 2
about to write items: [3]
about to write items: [4, 5, 6]
about to write items: [4, 5, 6]
about to write items: [4, 5, 6]
about to write items: [4, 5, 6]
about to write items: [4, 5, 6]
about to write items: [4]
item = 4
about to write items: [5]
about to write items: [6]
item = 6
This shows that each chunk is retried at most 5 times before chunk scanning is triggered. Now if you change the SimpleRetryPolicy with a NeverRetryPolicy, you will see that failed chunks are never retried (as expected by setting the NeverRetryPolicy) and that chunk scanning is started right away:
about to write items: [1, 2, 3]
about to write items: [1]
item = 1
about to write items: [2]
item = 2
about to write items: [3]
about to write items: [4, 5, 6]
about to write items: [4]
item = 4
about to write items: [5]
about to write items: [6]
item = 6
So I believe this is only a documentation issue, where the Configuring Retry Logic and Configuring Skip Logic sections should be updated with more details about the scanning feature.
I don't understand the explanation. The chunks themselves are not retried perhaps, but even with NeverRetryPolicy looks like every individual failed element is. I don't find this consistent. If I set NeverRetryPolicy I expect no retries at all
Also when chunk size > 1, in my case at least, on an individual error the successful ones are being retried as well