spring-batch icon indicating copy to clipboard operation
spring-batch copied to clipboard

Spring Batch issue with MultiResourceItemWriter and ClassifierCompositeItemWriter as its returning the invalid count into the output file

Open javaHelper opened this issue 1 year ago • 7 comments

I am using Spring Boot + Batch v2.7.1 in my project and looks like there is a bug when Reading from FlatFileItemReader using ClassifierCompositeItemWriter and MultiResourceItemWriter as itemCountLimitPerResource value doesn't works well and gives wrong responses.

I am reading csv file and splitting into multiple files having max records in every file should be 5 only, but the code which I developed giving me 7 values.

Code Uploaded here: https://github.com/javaHelper/bug-4660/tree/main/bug-4660

@EnableBatchProcessing
@SpringBootApplication(exclude = {DataSourceAutoConfiguration.class})
public class MultiResourceSplitApplication {

    public static void main(String[] args) {
        SpringApplication.run(MultiResourceSplitApplication.class, args);
    }
}
package com.example;

import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.item.file.FlatFileItemReader;
import org.springframework.batch.item.file.FlatFileItemWriter;
import org.springframework.batch.item.file.builder.FlatFileItemReaderBuilder;
import org.springframework.batch.item.file.builder.FlatFileItemWriterBuilder;
import org.springframework.batch.item.file.builder.MultiResourceItemWriterBuilder;
import org.springframework.batch.item.file.mapping.DefaultLineMapper;
import org.springframework.batch.item.file.transform.DelimitedLineTokenizer;
import org.springframework.batch.item.file.transform.PassThroughLineAggregator;
import org.springframework.batch.item.support.ClassifierCompositeItemWriter;
import org.springframework.batch.item.support.builder.ClassifierCompositeItemWriterBuilder;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.classify.Classifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.ClassPathResource;
import org.springframework.core.io.FileSystemResource;

@Configuration
public class MyJobConfig {

    @Autowired
    private JobBuilderFactory jobBuilderFactory;
    @Autowired
    private StepBuilderFactory stepBuilderFactory;
    
    @Bean
    public FlatFileItemReader<Employee> itemReader() {
        DelimitedLineTokenizer tokenizer = new DelimitedLineTokenizer();
        tokenizer.setNames("empId", "firstName", "lastName", "role");

        DefaultLineMapper<Employee> employeeLineMapper = new DefaultLineMapper<>();
        employeeLineMapper.setLineTokenizer(tokenizer);
        employeeLineMapper.setFieldSetMapper(new EmployeeFieldSetMapper());
        employeeLineMapper.afterPropertiesSet();

        return new FlatFileItemReaderBuilder<Employee>()
                .name("flatFileReader")
                .linesToSkip(1)
                .resource(new ClassPathResource("employee.csv"))
                .lineMapper(employeeLineMapper)
                .build();
    }

    @Bean
    public ClassifierCompositeItemWriter<Employee> classifierCompositeItemWriter() throws Exception {
        Classifier<Employee, ItemWriter<? super Employee>> classifier = new EmployeeClassifier(
                javaDeveloperItemWriter(), 
                pythonDeveloperItemWriter(), 
                cloudDeveloperItemWriter());
        
        return new ClassifierCompositeItemWriterBuilder<Employee>()
                .classifier(classifier)
                .build();
    }

    @Bean
    public ItemWriter<Employee> javaDeveloperItemWriter() {
        FlatFileItemWriter<Employee> itemWriter = new FlatFileItemWriterBuilder<Employee>()
                .lineAggregator(new PassThroughLineAggregator<>())
                .name("iw1")
                .build();

        return new MultiResourceItemWriterBuilder<Employee>()
                .name("javaDeveloperItemWriter")
                .delegate(itemWriter)
                .resource(new FileSystemResource("javaDeveloper-employee.csv"))
                .itemCountLimitPerResource(5)
                .resourceSuffixCreator(index -> "-" + index)
                .build();
    }

    @Bean
    public ItemWriter<Employee> pythonDeveloperItemWriter() {
        FlatFileItemWriter<Employee> itemWriter = new FlatFileItemWriterBuilder<Employee>()
                .lineAggregator(new PassThroughLineAggregator<>())
                .name("iw2")
                .build();

        return new MultiResourceItemWriterBuilder<Employee>()
                .name("pythonDeveloperItemWriter")
                .delegate(itemWriter)
                .resource(new FileSystemResource("pythonDeveloper-employee.csv"))
                .itemCountLimitPerResource(5)
                .resourceSuffixCreator(index -> "-" + index)
                .build();
    }

    @Bean
    public ItemWriter<Employee> cloudDeveloperItemWriter() {
        FlatFileItemWriter<Employee> itemWriter = new FlatFileItemWriterBuilder<Employee>()
                .lineAggregator(new PassThroughLineAggregator<>())
                .name("iw3")
                .build();

        return new MultiResourceItemWriterBuilder<Employee>()
                .name("cloudDeveloperItemWriter")
                .delegate(itemWriter)
                .resource(new FileSystemResource("cloudDeveloper-employee.csv"))
                .itemCountLimitPerResource(5)
                .resourceSuffixCreator(index -> "-" + index)
                .build();
    }

    @Bean
    public Step step() throws Exception {
        return stepBuilderFactory.get("step")
                .<Employee, Employee>chunk(3)
                .reader(itemReader())
                .writer(classifierCompositeItemWriter())
                .build();
    }

    @Bean
    public Job job() throws Exception {
        return jobBuilderFactory.get("job")
                .start(step())
                .build();
    }
}
import org.springframework.batch.item.ItemWriter;
import org.springframework.classify.Classifier;

import lombok.Setter;


@Setter
public class EmployeeClassifier implements Classifier<Employee, ItemWriter<? super Employee>> {
    private static final long serialVersionUID = 1L;
    private ItemWriter<Employee> javaDeveloperFileItemWriter;
    private ItemWriter<Employee> pythonDeveloperFileItemWriter;
    private ItemWriter<Employee> cloudDeveloperFileItemWriter;
    
    public EmployeeClassifier() {
        
    }

    public EmployeeClassifier(ItemWriter<Employee> javaDeveloperFileItemWriter,
                              ItemWriter<Employee> pythonDeveloperFileItemWriter,
                              ItemWriter<Employee> cloudDeveloperFileItemWriter) {
        this.javaDeveloperFileItemWriter = javaDeveloperFileItemWriter;
        this.pythonDeveloperFileItemWriter = pythonDeveloperFileItemWriter;
        this.cloudDeveloperFileItemWriter = cloudDeveloperFileItemWriter;
    }

    @Override
    public ItemWriter<? super Employee> classify(Employee employee) {
        if(employee.getRole().equals("Java Developer")){
            return javaDeveloperFileItemWriter;
        }
        else if(employee.getRole().equals("Python Developer")){
            return pythonDeveloperFileItemWriter;
        }
        return cloudDeveloperFileItemWriter;
    }
}
@AllArgsConstructor
@NoArgsConstructor
@Data
@Builder
public class Employee {
    private String empId;
    private String firstName;
    private String lastName;
    private String role;

    @Override
    public String toString() {
        return empId + "," + firstName + "," + lastName + "," + role;
    }
}
public class EmployeeFieldSetMapper implements FieldSetMapper<Employee> {
    @Override
    public Employee mapFieldSet(FieldSet fieldSet) throws BindException {
        return Employee.builder()
                .empId(fieldSet.readRawString("empId"))
                .firstName(fieldSet.readRawString("firstName"))
                .lastName(fieldSet.readRawString("lastName"))
                .role(fieldSet.readRawString("role"))
                .build();
    }
}

employee.csv

empId,firstName,lastName,role
1,Mike ,Doe,Java Developer
2,Matt ,Doe,Java Developer
3,Deepak ,Doe,Python Developer
4,Neha ,Doe,Python Developer
5,Harish,Doe,Python Developer
6,Parag ,Doe,Python Developer
7,Harshita ,Doe,Python Developer
8,Pranali ,Doe,Python Developer
9,Raj ,Doe,Python Developer
10,Ravi,Doe,Python Developer
11,Gagan,Doe,Java Developer
12,Ashish ,Doe,Java Developer
13,Rajesh,Doe,Java Developer
14,Anosh ,Doe,Java Developer
15,Arpit ,Doe,Java Developer
16,Sneha ,Doe,Java Developer
17,Sneha ,Doe,Java Developer

Output: javaDeveloper-employee.csv-1

1,Mike ,Doe,Java Developer
2,Matt ,Doe,Java Developer
11,Gagan,Doe,Java Developer
12,Ashish ,Doe,Java Developer
13,Rajesh,Doe,Java Developer
14,Anosh ,Doe,Java Developer
15,Arpit ,Doe,Java Developer

javaDeveloper-employee.csv-2

16,Sneha ,Doe,Java Developer
17,Sneha ,Doe,Java Developer

pythonDeveloper-employee.csv-1

3,Deepak ,Doe,Python Developer
4,Neha ,Doe,Python Developer
5,Harish,Doe,Python Developer
6,Parag ,Doe,Python Developer
7,Harshita ,Doe,Python Developer
8,Pranali ,Doe,Python Developer
9,Raj ,Doe,Python Developer

pythonDeveloper-employee.csv-1

10,Ravi,Doe,Python Developer

When I used chunk(0) - its resulting into correct count in to the file, but when it comes to huge data, the performance is very very flow. Any suggestion

javaHelper avatar Sep 10 '24 06:09 javaHelper

@fmbenhassine - Were you able to re-produce this issue?

javaHelper avatar Sep 19 '24 04:09 javaHelper

No, I did not start working on this yet. I would love to help if you can package all code snippets with a pom.xml in a zip or a repository that I can clone/run. There is a project template that you can use as a starting point here: https://github.com/spring-projects/spring-batch/blob/main/ISSUE_REPORTING.md

fmbenhassine avatar Sep 19 '24 07:09 fmbenhassine

@fmbenhassine - I've uploaded code here: https://github.com/javaHelper/bug-4660/tree/main/bug-4660. Could you please have a look?

javaHelper avatar Sep 19 '24 09:09 javaHelper

Great! Thank you for the sample. I will take a look and get back to you asap.

fmbenhassine avatar Sep 19 '24 12:09 fmbenhassine

Thank you for providing a minimal example, well done! I am able to reproduce the issue and I think this is a bug. In fact, the itemCountLimitPerResource is not respected in this case.

An interesting finding I noticed as well is that when I set the chunk size to 10, everything is written in the same file (ie the itemCountLimitPerResource is not respected neither).

I will plan the fix in a future patch release, because as of now we are working on the upcoming 5.2. If someone manages to fix the issue, then a PR is welcome.

fmbenhassine avatar Sep 20 '24 10:09 fmbenhassine

I think the problem isn't related to the ClassifierCompositeItemWriter but a duplicate of #1722.

The effect of itemCountLimitPerResource is indeed surprising, so it might be a good idea to change it. But the current behaviour is documented in the JavaDoc of MultiResourceItemWriter: https://github.com/spring-projects/spring-batch/blob/main/spring-batch-infrastructure/src/main/java/org/springframework/batch/item/file/MultiResourceItemWriter.java#L37

hpoettker avatar Sep 20 '24 10:09 hpoettker

@fmbenhassine - Thanks. Agree its a bug and its reproducible. When I tested in my project with the huge volume keeping 1/2 millions records as itemCountLimitPerResource, I surprisingly saw the different outputs.
Hope this will be fix soon.

javaHelper avatar Sep 20 '24 16:09 javaHelper

@fmbenhassine - Can you please confirm when are you planning to have a fix?

javaHelper avatar Dec 13 '24 10:12 javaHelper

This is a duplicate of #1722. Unfortunately, it won't be in next week's 5.2.1, so I planned it for 5.2.2 (for which the release date is not set yet as it depends on Spring Boot's planning, keep tuned).

fmbenhassine avatar Dec 13 '24 15:12 fmbenhassine

@javaHelper FYI, this was resolved in #4742. I tested the fix on your reproducer and now things work as expected. Many thanks to @hpoettker for the fix!

The patch should be in v5.2.2, which is planned for March 19th.

fmbenhassine avatar Feb 19 '25 14:02 fmbenhassine

@fmbenhassine - Many thanks

javaHelper avatar Feb 20 '25 04:02 javaHelper