spring-batch icon indicating copy to clipboard operation
spring-batch copied to clipboard

Add support for sharding keys in MongoItemWriter

Open soohyun0131-lee opened this issue 2 years ago • 2 comments

Bug description

using mongoItemWriter on the sharded cluster throws the following error.

org.springframework.data.mongodb.BulkOperationException: Bulk write operation error on server x.x.x.x:50011. Write errors: [BulkWriteError{index=0, code=61, message='Failed to target upsert by query :: could not extract exact shard key', details={}}]. ; nested exception is com.mongodb.MongoBulkWriteException: Bulk write operation error on server x.x.x.x:50011. Write errors: [BulkWriteError{index=0, code=61, message='Failed to target upsert by query :: could not extract exact shard key', details={}}]. 
	at org.springframework.data.mongodb.core.DefaultBulkOperations.bulkWriteTo(DefaultBulkOperations.java:324) ~[spring-data-mongodb-3.3.3.jar:3.3.3]
	at org.springframework.data.mongodb.core.MongoTemplate.execute(MongoTemplate.java:560) ~[spring-data-mongodb-3.3.3.jar:3.3.3]
	at org.springframework.data.mongodb.core.DefaultBulkOperations.execute(DefaultBulkOperations.java:290) ~[spring-data-mongodb-3.3.3.jar:3.3.3]
	at org.springframework.batch.item.data.MongoItemWriter.saveOrUpdate(MongoItemWriter.java:169) ~[main/:na]
	at org.springframework.batch.item.data.MongoItemWriter.doWrite(MongoItemWriter.java:138) ~[main/:na]
	at org.springframework.batch.item.data.MongoItemWriter$1.beforeCommit(MongoItemWriter.java:198) ~[main/:na]

I found the reason is we couldn't specify 'shard key' by using MongoItemWriter because of it's saveOrUpdate.

private void saveOrUpdate(List<? extends T> items) {
    BulkOperations bulkOperations = initBulkOperations(BulkMode.ORDERED, items.get(0));
    MongoConverter mongoConverter = this.template.getConverter();
    FindAndReplaceOptions upsert = new FindAndReplaceOptions().upsert();
    for (Object item : items) {
        Document document = new Document();
        mongoConverter.write(item, document);
        Object objectId = document.get(ID_KEY) != null ? document.get(ID_KEY) : new ObjectId();
        // here
        Query query = new Query().addCriteria(Criteria.where(ID_KEY).is(objectId));
        bulkOperations.replaceOne(query, document, upsert);
    }
    bulkOperations.execute();
}

As far as I know, to use Query in sharded MongoDB Cluster, we should specify shard key by using addCriteria as below.

private void saveOrUpdate(List<? extends T> items) {
    BulkOperations bulkOperations = initBulkOperations(BulkMode.ORDERED, items.get(0));
    MongoConverter mongoConverter = this.template.getConverter();
    FindAndReplaceOptions upsert = new FindAndReplaceOptions().upsert();
    for (Object item : items) {
        Document document = new Document();
        mongoConverter.write(item, document);
        Object objectId = document.get(ID_KEY) != null ? document.get(ID_KEY) : new ObjectId();
        // here
        Query query = new Query()
            .addCriteria(Criteria.where(ID_KEY).is(objectId))
            .addCriteria(Criteria.where(SHARD_KEY).is(keyvalue));
        bulkOperations.replaceOne(query, document, upsert);
    }
    bulkOperations.execute();
}

However, I'm not sure what is the best way to handle this problem since mongodb's shard key could be multiple.

p.s - for save, just using insertwill be enough if user specify shard key on their @Document by @Sharded annotation,

private void save(List<? extends T> items) {
    BulkOperations bulkOperations = initBulkOperations(BulkMode.ORDERED, items.get(0));
    bulkOperations.insert(items);
    bulkOperations.execute();
}

Environment

  • spring-batch 4.3 & 5.0
  • MongoDB sharded cluster (4.2)
  • Kotlin (java 17)

Steps to reproduce

  • use mongoItemWriter on the sharded cluster
    @Bean
    @StepScope
    fun mongoItemWriter(mongoOperations: MongoOperations): MongoItemWriter<MongoItem> {
        return MongoItemWriterBuilder<MongoItem>()
            .collection("mongoItem")
            .template(mongoOperations)
            .build()
    }
@Document("mongoItem")
@Sharded(shardKey = ["shard_key"], shardingStrategy = ShardingStrategy.HASH)
data class MongoItem(
    @Id
    var id: String? = null,
    @Field(name = "shard_key")
    val shardKey: Long,
    val source: String,
)

Expected behavior

  • writes data on MongoDB without any error.

soohyun0131-lee avatar Jan 16 '23 05:01 soohyun0131-lee

I don't consider this as a bug, but as a missing feature.

If it is possible to set sharding keys on the query, then we can make the MongoItemWriter configurable with an optional list of sharding keys. I will check with the Spring Data team and update this issue accordingly.

fmbenhassine avatar Apr 03 '23 15:04 fmbenhassine

Waiting for https://github.com/spring-projects/spring-data-mongodb/issues/4361 to proceed.

fmbenhassine avatar Apr 11 '23 17:04 fmbenhassine