spring-batch icon indicating copy to clipboard operation
spring-batch copied to clipboard

Improve documentation with regard to chunk scanning

Open jpbassinello opened this issue 4 years ago • 4 comments
trafficstars

Bug description A simple faultTolerant step configuration using NeverRetryPolicy and AlwaysSkipItemSkipPolicy always retry the items when any exception occurs. Unless I am missing something, I would expect to do not see any retries at all.

Environment Spring Batch 4.3.3 jdk 15.0.2 Postgres 12.4

Steps to reproduce

  • Configure a faultTolerent step with policies: NeverRetryPolicy and AlwaysSkipItemSkipPolicy
  • Configure the writer to throw a RuntimeException
  • Launch a job with that step
  • Check number of times that the writer is called

Expected behavior With 5 items to be processed, I would expect that the service would be called only 5 times.

Minimal Complete Reproducible example

Job Configuration:

@Configuration
@EnableBatchProcessing
@RequiredArgsConstructor
@Slf4j
public class ChunkSizeBatchConfig {

  private final JobBuilderFactory jobBuilderFactory;
  private final StepBuilderFactory stepBuilderFactory;
  private final MockedService mockedService;

  @Bean
  Job chunkSizeJob() {

    var itemReader = new ListItemReader<>(List.of(1, 2, 3, 4, 5));

    var itemWriter = new ItemWriter<Integer>() {
      @Override
      public void write(@Nonnull List<? extends Integer> items) {
        logger.info("chunkSizeJob processing item \"{}\"", items);
        items.forEach(mockedService::doSomething);
      }
    };

    var step = stepBuilderFactory.get("testStep")
        .<Integer, Integer>chunk(1)
        .reader(itemReader)
        .writer(itemWriter)
        .faultTolerant()
        .skipPolicy(new AlwaysSkipItemSkipPolicy())
        .retryPolicy(new NeverRetryPolicy())
        .build();

    return jobBuilderFactory.get("testJob")
        .incrementer(new RunIdIncrementer())
        .flow(step)
        .end()
        .build();
  }

  interface MockedService {
    void doSomething(int obj);
  }

}

Test class

@SpringBatchTest
@DataJpaTest(excludeAutoConfiguration = {TestDatabaseAutoConfiguration.class})
@EnableAutoConfiguration
@EntityScan("com.possiblefinance.ffp.adapter.out.persistence")
// overrides @SpringBatchTest @TestExecutionListeners
@TestExecutionListeners(
    listeners = {DBRiderTestExecutionListener.class, StepScopeTestExecutionListener.class, JobScopeTestExecutionListener.class},
    mergeMode = TestExecutionListeners.MergeMode.MERGE_WITH_DEFAULTS
)
// Spring Batch needs to open it's own transaction for the test
@Transactional(propagation = Propagation.SUPPORTS)
@ContextConfiguration(classes = {ChunkSizeBatchConfig.class})
class ChunkSizeBatchConfigIT {

  @Autowired
  protected JobLauncherTestUtils jobLauncherTestUtils;

  @MockBean
  private ChunkSizeBatchConfig.MockedService mockedService;

  @Test
  @SneakyThrows
  void testJobCheckCompletedEvenWithException() {
    var jobParameters = new JobParametersBuilder()
        .addLong("testExecutionKey", System.currentTimeMillis())
        .addString(BatchConstants.JOB_PARAMETER_PARTITIONING_KEYS, "1,2,3")
        .toJobParameters();

    Mockito.doThrow(new RuntimeException("Any error"))
        .when(mockedService).doSomething(anyInt());

    var jobExecution1 = jobLauncherTestUtils.launchJob(jobParameters);
    assertThat(jobExecution1.getExitStatus().getExitCode()).isEqualTo("COMPLETED");

    verify(mockedService, times(5)).doSomething(anyInt());
  }

}

Test Output:

2021-06-18 16:13:04.462  INFO 11727 --- [           main] c.p.f.a.j.configs.ChunkSizeBatchConfig   : chunkSizeJob processing item "[1]"
2021-06-18 16:13:04.469  INFO 11727 --- [           main] c.p.f.a.j.configs.ChunkSizeBatchConfig   : chunkSizeJob processing item "[1]"
2021-06-18 16:13:04.477  INFO 11727 --- [           main] c.p.f.a.j.configs.ChunkSizeBatchConfig   : chunkSizeJob processing item "[2]"
2021-06-18 16:13:04.478  INFO 11727 --- [           main] c.p.f.a.j.configs.ChunkSizeBatchConfig   : chunkSizeJob processing item "[2]"
2021-06-18 16:13:04.485  INFO 11727 --- [           main] c.p.f.a.j.configs.ChunkSizeBatchConfig   : chunkSizeJob processing item "[3]"
2021-06-18 16:13:04.486  INFO 11727 --- [           main] c.p.f.a.j.configs.ChunkSizeBatchConfig   : chunkSizeJob processing item "[3]"
2021-06-18 16:13:04.493  INFO 11727 --- [           main] c.p.f.a.j.configs.ChunkSizeBatchConfig   : chunkSizeJob processing item "[4]"
2021-06-18 16:13:04.494  INFO 11727 --- [           main] c.p.f.a.j.configs.ChunkSizeBatchConfig   : chunkSizeJob processing item "[4]"
2021-06-18 16:13:04.502  INFO 11727 --- [           main] c.p.f.a.j.configs.ChunkSizeBatchConfig   : chunkSizeJob processing item "[5]"
2021-06-18 16:13:04.503  INFO 11727 --- [           main] c.p.f.a.j.configs.ChunkSizeBatchConfig   : chunkSizeJob processing item "[5]"
2021-06-18 16:13:04.522  INFO 11727 --- [           main] o.s.batch.core.step.AbstractStep         : Step: [testStep] executed in 79ms
2021-06-18 16:13:04.539  INFO 11727 --- [           main] o.s.b.c.l.support.SimpleJobLauncher      : Job: [FlowJob: [name=testJob]] completed with the following parameters: [{testExecutionKey=1624032784328, partitioningKeys=1,2,3}] and the following status: [COMPLETED] in 115ms

org.mockito.exceptions.verification.TooManyActualInvocations: 
com.possiblefinance.ffp.application.jobs.configs.ChunkSizeBatchConfig$MockedService#0 bean.doSomething(
    <any integer>
);
Wanted 5 times:
-> at com.possiblefinance.ffp.application.jobs.configs.ChunkSizeBatchConfigIT.testJobCheckCompletedEvenWithException(ChunkSizeBatchConfigIT.java:52)
But was 10 times:
-> at java.base/java.util.ArrayList.forEach(ArrayList.java:1511)
-> at java.base/java.util.Collections$SingletonList.forEach(Collections.java:4933)
-> at java.base/java.util.ArrayList.forEach(ArrayList.java:1511)
-> at java.base/java.util.Collections$SingletonList.forEach(Collections.java:4933)
-> at java.base/java.util.ArrayList.forEach(ArrayList.java:1511)
-> at java.base/java.util.Collections$SingletonList.forEach(Collections.java:4933)
-> at java.base/java.util.ArrayList.forEach(ArrayList.java:1511)
-> at java.base/java.util.Collections$SingletonList.forEach(Collections.java:4933)
-> at java.base/java.util.ArrayList.forEach(ArrayList.java:1511)
-> at java.base/java.util.Collections$SingletonList.forEach(Collections.java:4933)

jpbassinello avatar Jun 18 '21 16:06 jpbassinello

Thank you for opening this issue. I do confirm that this could be surprising, but the behaviour you are seeing is actually chunk scanning, which is triggered when an exception is thrown from the ChunkProcessor (ie item processor or item writer) in a fault tolerant step. This is probably a documentation issue, as the Configuring Retry Logic and Configuring Skip Logic sections do not mention chunk scanning, but there is a sample that explains the expected behaviour in details.

I think what is missing is an option to opt out of chunk scanning as requested in #2071. Do you agree?

fmbenhassine avatar Jun 22 '21 13:06 fmbenhassine

Thank you @benas. I understand the issue better now. I was able to move the logic from ItemWriter to ItemProcessor as suggested by #2071 and it is working as expected now. For me, the opt-out of chunk scanning can work but seems like some kind of workaround for something that should no happen. Sorry if I don't have all the context and implementation details, but retrying an item that was already processed just to verify each item is causing the issue, seems odd to me. Anyway, let me know if this is something I can help with. I would be happy to submit a PR if I can get some directions on how to fix this without breaking backwards compatibility.

jpbassinello avatar Jun 22 '21 16:06 jpbassinello

Sorry if I don't have all the context and implementation details, but retrying an item that was already processed just to verify each item is causing the issue, seems odd to me.

This is because chunk scanning retries items one by one at the ChunkProcessor level (which involves the ItemProcessor and the ItemWriter), and that's why the documentation clearly mentions that in a fault-tolerant step, the item processor should be designed in an idempotent way or marked as non transactional with the processorNonTransactional flag (which will re-use processed items from the cache between retries).

I believe the fact that you were expecting no retry to happen when supplying a NeverRetryPolicy is because the documentation does not mention that chunk scanning will be triggered anyway and because your example uses a chunk size of 1 which made you believe there is a retry happening. Here is a quick example with a SimpleRetryPolicy of a maximum of 5 attempts:

import java.util.Arrays;

import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.batch.core.step.skip.AlwaysSkipItemSkipPolicy;
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.item.support.ListItemReader;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.retry.policy.SimpleRetryPolicy;

@Configuration
@EnableBatchProcessing
public class GH3946 {

    @Bean
    public ItemReader<Integer> itemReader() {
        return new ListItemReader<>(Arrays.asList(1, 2, 3, 4, 5, 6));
    }

    @Bean
    public ItemWriter<Integer> itemWriter() {
        return items -> {
            System.out.println("about to write items: " + items);
            if (items.contains(3) || items.contains(5)) {
                throw new Exception("expected");
            }
            for (Integer item : items) {
                System.out.println("item = " + item);
            }
        };
    }

    @Bean
    public Job job(JobBuilderFactory jobs, StepBuilderFactory steps) {
        return jobs.get("job")
                .start(steps.get("step")
                        .<Integer, Integer>chunk(3)
                        .reader(itemReader())
                        .writer(itemWriter())
                        .faultTolerant()
                        .retryPolicy(new SimpleRetryPolicy(5))
                        .skipPolicy(new AlwaysSkipItemSkipPolicy())
                        .build())
                .build();
    }

    public static void main(String[] args) throws Exception {
        ApplicationContext context = new AnnotationConfigApplicationContext(GH3946.class);
        JobLauncher jobLauncher = context.getBean(JobLauncher.class);
        Job job = context.getBean(Job.class);
        JobParameters jobParameters = new JobParameters();
        jobLauncher.run(job, jobParameters);
    }

}

This prints:

about to write items: [1, 2, 3]
about to write items: [1, 2, 3]
about to write items: [1, 2, 3]
about to write items: [1, 2, 3]
about to write items: [1, 2, 3]
about to write items: [1]
item = 1
about to write items: [2]
item = 2
about to write items: [3]
about to write items: [4, 5, 6]
about to write items: [4, 5, 6]
about to write items: [4, 5, 6]
about to write items: [4, 5, 6]
about to write items: [4, 5, 6]
about to write items: [4]
item = 4
about to write items: [5]
about to write items: [6]
item = 6

This shows that each chunk is retried at most 5 times before chunk scanning is triggered. Now if you change the SimpleRetryPolicy with a NeverRetryPolicy, you will see that failed chunks are never retried (as expected by setting the NeverRetryPolicy) and that chunk scanning is started right away:

about to write items: [1, 2, 3]
about to write items: [1]
item = 1
about to write items: [2]
item = 2
about to write items: [3]
about to write items: [4, 5, 6]
about to write items: [4]
item = 4
about to write items: [5]
about to write items: [6]
item = 6

So I believe this is only a documentation issue, where the Configuring Retry Logic and Configuring Skip Logic sections should be updated with more details about the scanning feature.

fmbenhassine avatar Jun 24 '21 12:06 fmbenhassine

I don't understand the explanation. The chunks themselves are not retried perhaps, but even with NeverRetryPolicy looks like every individual failed element is. I don't find this consistent. If I set NeverRetryPolicy I expect no retries at all

Also when chunk size > 1, in my case at least, on an individual error the successful ones are being retried as well

nightswimmings avatar Apr 16 '24 12:04 nightswimmings