spring-data-mongodb icon indicating copy to clipboard operation
spring-data-mongodb copied to clipboard

Add support for reactive bulk operations [DATAMONGO-1922]

Open spring-projects-issues opened this issue 7 years ago • 3 comments

Mark Paluch opened DATAMONGO-1922 and commented


1 votes, 2 watchers

spring-projects-issues avatar Apr 09 '18 07:04 spring-projects-issues

Bulk upsert would be wonderful, as insertAll fails with a DuplicateKeyException if even one record is duplicated, and this exception cannot seem to be caught and ignored, such as in the following code:

private fun <T: DataModel> insertAllValid(uri: String, transformer: (JSONObject) -> InputModel<T>) {
    getJSON(uri).map(transformer)
        .map(InputModel<T>::toDataModelIfValid)
        .mapNotNull { it!! }
        .collectList()
        // Would be nice to have `::upsertAll` that bulk inserts new + updates existing records
        .flatMapMany(mongoTemplate::insertAll)
        .onErrorResume(DuplicateKeyException::class.java) { e ->
            println(e)
            Flux.empty()
        }
        .doOnNext { logger.debug(it.toString()) }
        .doOnError { logger.error(it.toString()) }
        .subscribe()
}

tgrushka avatar May 07 '22 23:05 tgrushka

@tgrushka A nice workaround:

reactiveMongoTemplate.getCollection(config.getCollection()).flatMap(mongoCollection -> {
        var operations = entities.stream().map(entity -> {
            Document doc = new Document();
            reactiveMongoTemplate.getConverter().write(entity, doc);
            var filter = new Document("externalId", entity.getExternalId());
            return new UpdateOneModel<Document>(filter, new Document("$set", doc), new UpdateOptions().upsert(true));
        }).toList();
        return Mono.from(mongoCollection.bulkWrite(operations));
    })

almogtavor avatar May 08 '22 21:05 almogtavor

@almogtavor Instead of upsert how can I update like this way var bulkOps = mongoTemplate.bulkOps() for(dto : List<DTO> DTOs) { Query query = new Query(); query.addCriteria(Criteria.where(ID).is(dto.getId())); Update update = new Update() .set(STATUS, dto.getStatus()) bulkOps.updateOne(query, update) } bulkOps.execute();

argmnt avatar May 20 '22 14:05 argmnt