spring-data-mongodb
spring-data-mongodb copied to clipboard
Add support for reactive bulk operations [DATAMONGO-1922]
Bulk upsert would be wonderful, as insertAll fails with a DuplicateKeyException if even one record is duplicated, and this exception cannot seem to be caught and ignored, such as in the following code:
private fun <T: DataModel> insertAllValid(uri: String, transformer: (JSONObject) -> InputModel<T>) {
getJSON(uri).map(transformer)
.map(InputModel<T>::toDataModelIfValid)
.mapNotNull { it!! }
.collectList()
// Would be nice to have `::upsertAll` that bulk inserts new + updates existing records
.flatMapMany(mongoTemplate::insertAll)
.onErrorResume(DuplicateKeyException::class.java) { e ->
println(e)
Flux.empty()
}
.doOnNext { logger.debug(it.toString()) }
.doOnError { logger.error(it.toString()) }
.subscribe()
}
@tgrushka A nice workaround:
reactiveMongoTemplate.getCollection(config.getCollection()).flatMap(mongoCollection -> {
var operations = entities.stream().map(entity -> {
Document doc = new Document();
reactiveMongoTemplate.getConverter().write(entity, doc);
var filter = new Document("externalId", entity.getExternalId());
return new UpdateOneModel<Document>(filter, new Document("$set", doc), new UpdateOptions().upsert(true));
}).toList();
return Mono.from(mongoCollection.bulkWrite(operations));
})
@almogtavor Instead of upsert how can I update like this way var bulkOps = mongoTemplate.bulkOps() for(dto : List<DTO> DTOs) { Query query = new Query(); query.addCriteria(Criteria.where(ID).is(dto.getId())); Update update = new Update() .set(STATUS, dto.getStatus()) bulkOps.updateOne(query, update) } bulkOps.execute();