spring-batch icon indicating copy to clipboard operation
spring-batch copied to clipboard

MongoItemReader mongoOperation used in `doPageRead` to utilize `stream` method instead of `find`

Open ZJTAN97 opened this issue 2 years ago • 0 comments

Bug description

While using MongoItemReader, I have configured my Step bean to utilize faultTolerant method and a skipLimit of 5 on the skip condition for IllegalArgumentException

@Configuration
@RequiredArgsConstructor
public class PetJobConfig {

    private final PetRepo petRepo;

    private final JobRepository jobRepository;

    private final PlatformTransactionManager platformTransactionManager;

    private final MongoTemplate mongoTemplate;

    @Bean
    public Step readPetFromMongo() {
        return new StepBuilder("petReaderMongo", jobRepository)
                .allowStartIfComplete(true)
                .<PetDomain, PetDomain>chunk(1000, platformTransactionManager)
                .reader(petRepo.petReader())
                .writer(new PetWriter(mongoTemplate))
                .faultTolerant()
                .skipLimit(5)
                .skip(IllegalArgumentException.class)
                .build();
    }

    @Bean
    public Job readPetFromMongoJob() {
        return new JobBuilder("petReaderMongoJob", jobRepository)
                .start(readPetFromMongo())
                .build();
    }

}

In my current context, there could be data in my database that does not fully conform to the target type provided to the reader. And I would like to make use of the faultTolerant method to skip these dirty data.

type provided to MongoItemReader

public record Animal (
  String name,
  Animal animal
) {}


public enum Animal {
   CAT,
   DOG;
}

Repo class


@Repository
@RequiredArgsConstructor
public class PetRepo {

    private final MongoTemplate mongoTemplate;

    public MongoItemReader<PetDomain> petReader() {

        Map<String, Sort.Direction> sorts = new HashMap<>();

        Query query = new Query();

        var reader = new MongoItemReaderBuilder<PetDomain>()
                .name("petReader")
                .collection("pet")
                .pageSize(500)
                .template(mongoTemplate)
                .targetType(PetDomain.class)
                .sorts(sorts)
                .query(query)
                .build();

        return reader;
    }

}


E.g. dirty data from mongodb

{
  name: "Bingo",
  animal: "CAT2" // Does not conform to enum provided
}

However, due to the way doPageRead utilizes MongoOperations to retrieve data as a list instead of a stream, it is unable to serialize to the type as long as there is dirty data.

So to be able to iterate through the iterator, I have to override the entire doPageRead method just to change the MongoOperation method from find to stream

@AllArgsConstructor
@NoArgsConstructor
@Setter
public class CustomMongoItemReader<T> extends MongoItemReader<T> {

    private MongoOperations template;
    private Query query;
    private String queryString;
    private Class<? extends T> type;
    private Sort sort;
    private String hint;
    private String fields;
    private String collection;
    private List<Object> parameterValues = new ArrayList();

    @Override
    protected Iterator<T> doPageRead() {
        PageRequest pageRequest;
        if (this.queryString != null) {
            pageRequest = PageRequest.of(this.page, this.pageSize, this.sort);
            String populatedQuery = this.replacePlaceholders(this.queryString, this.parameterValues);
            BasicQuery mongoQuery;
            if (StringUtils.hasText(this.fields)) {
                mongoQuery = new BasicQuery(populatedQuery, this.fields);
            } else {
                mongoQuery = new BasicQuery(populatedQuery);
            }

            mongoQuery.with(pageRequest);
            if (StringUtils.hasText(this.hint)) {
                mongoQuery.withHint(this.hint);
            }

            return StringUtils.hasText(this.collection) ?
                    // Changing from `find` to `stream`
                    (Iterator<T>) this.template.stream(mongoQuery, this.type, this.collection).iterator() :
                    (Iterator<T>) this.template.stream(mongoQuery, this.type).iterator();
        } else {
            pageRequest = PageRequest.of(this.page, this.pageSize);
            this.query.with(pageRequest);
            return StringUtils.hasText(this.collection) ?
                    // Changing from `find` to `stream`
                    (Iterator<T>) this.template.stream(this.query, this.type, this.collection).iterator() :
                    (Iterator<T>) this.template.stream(this.query, this.type).iterator();
        }
    }

    private String replacePlaceholders(String input, List<Object> values) {
        ParameterBindingJsonReader reader = new ParameterBindingJsonReader(input, values.toArray());
        DecoderContext decoderContext = DecoderContext.builder().build();
        Document document = (new ParameterBindingDocumentCodec()).decode(reader, decoderContext);
        return document.toJson();
    }

}

I was wondering if its actually better to utilize stream instead as find prevents the iterator from iterating if a document from the database does not conform to the class type provided.

Environment

  • JDK 17
  • spring batch 5.0.3

Expected behavior

If theres non conforming data from the database to the type specified in MongoItemReader, it should be able to move on to the next item in the iterator.

Example Repository

https://github.com/ZJTAN97/spring-batch-mongo-item-reader-issue/tree/main

Thank you for reading the issue!

ZJTAN97 avatar Nov 30 '23 08:11 ZJTAN97