Retry remainder of a bulk insert after a failure due to unique indexes
Describe the bug When I perform a bulk write operation (inserting or saving a list) when the list contains duplicates that are already in the database (with a unique index on several attributes of this class), the elements of the list before the duplicate are inserted but not those after.
I've thought about a workaround but this inevitably leads to data loss, especially as it's impossible to know which elements have been inserted and which haven't since insert or save systematically returns the same list as was sent.
For example, I tried to insert (or save) at first, I catch the BulkWriteError, parse the message to identify which data hasn't been inserted, filter it on the original array and try once again to insert. But with this method, the elements before the duplicates has been inserted and at the second time, became a duplicate too.
I tested the performance of looping on the array and insert one by one the data but this 5 time slower on an array of 10k elements.
To Reproduce Steps to reproduce the behavior:
- Have a model with multiple attributes like name, category etc
- Create a unique index on these attributes
- Insert a data
- Try to save / insert a list containing the data already in the database, you can try at different positions in the list.
Expected behavior Mutiple choice here, assuming there is a unique index on the table :
- log the
E11000 duplicate key error collectionbut continues to insert the other elements of the array, returns a list of elements that cannot be inserted or those that have been inserted - or returns the list of the elements that cannot be inserted (that means duplicates and those after the duplicates) in order to retry.
- something else?
** Please complete the following information: **
- Server Version: mongo 7.0.5
- Driver Version: mongodb-driver-sync:4.11.1
- Morphia Version: morphia-core:2.4.11
Additional context
Model
@Entity("tmPackets")
@Indexes({@Index(fields = @Field(value = "receptionTime", type = IndexType.DESC)),
@Index(fields = @Field(value = "onBoardTime", type = IndexType.DESC)),
@Index(fields = @Field(value = "packetType", type = IndexType.DESC)),
@Index(
fields = {@Field(value = "onBoardTime", type = IndexType.DESC),
@Field(value = "sourceSeqCount", type = IndexType.DESC),
@Field(value = "apid", type = IndexType.DESC)},
options = @IndexOptions(unique = true))})
public class TmPacketToStore {
/**
* ID of the packet. This field is only used after storing.
*/
@Id
@JsonSerialize(using = ObjectIdSerializer.class)
@JsonDeserialize(using = ObjectIdDeserializer.class)
private ObjectId id;
/**
* Raw binary values of the packet.
*/
private byte[] rawPacket;
/**
* The ground reception time.
*/
private Instant receptionTime;
/**
* The onboard time.
*/
private Instant onBoardTime;
/**
* The type of the packet.
*/
private String packetType;
/**
* The APID of the packet.
*/
private long apid;
/**
* The source sequence counter.
*/
private long sourceSeqCount;
DAO
/**
* Insert telemetry packets in database.
*
* @param tmPackets Tm packets to insert
*/
public List<ObjectId> insertBatchTmPackets(List<TmPacketToStore> tmPackets) {
log.debug("Inserting TM packets by batch {}",
tmPackets.stream().map(TmPacketToStore::getPacketType).toList());
List<ObjectId> ids;
List<TmPacketToStore> tmStored = new ArrayList<>();
try {
tmStored = this.dataStore.save(tmPackets);
} catch (MongoBulkWriteException we) {
List<BulkWriteError> errors = we.getWriteErrors();
for (BulkWriteError error : errors) {
String errorMsg = error.getMessage();
TmPacketToStore duplicateTmFound = findAssociatedTmPacket(tmPackets, errorMsg);
if (duplicateTmFound != null) {
log.warn(errorMsg);
log.warn("Cannot insert duplicate TmPacket {} OnBoardTime({}) SrcSeqCount({}) APID({})",
duplicateTmFound.getPacketType(), duplicateTmFound.getOnBoardTime(),
duplicateTmFound.getSourceSeqCount(), duplicateTmFound.getApid());
}
}
} catch (Exception e) {
log.error("Unhandled exception {}", e.toString());
throw e;
}
ids = tmStored.stream().map(TmPacketToStore::getId).toList();
log.debug("Inserted {} TM packets {}", tmPackets.size(), tmPackets);
return ids;
}
Test
@Test
void insertDuplicatesTm() throws InterruptedException {
final int packetsCount = 10;
final int expectedPacketsCount = 9;
final byte[] redacted = REDACTED_BYTE_ARRAY
List<TmPacketToStore> packetsToStore = new ArrayList<>();
Instant onBoardTime = Instant.now();
for (int i = 0; i < packetsCount; i++) {
TmPacketToStore packet = new TmPacketToStore();
packet.setPacketType("REDACTED");
packet.setRawPacket(redacted);
packet.setSourceSeqCount(i);
packet.setReceptionTime(Instant.now());
packet.setOnBoardTime(onBoardTime.plusSeconds(i * 10));
packetsToStore.add(packet);
System.out.println(packet);
}
// Create duplicate
TmPacketToStore toDup = packetsToStore.get(1);
TmPacketToStore fromDup = packetsToStore.get(0);
fromDup.setOnBoardTime(toDup.getOnBoardTime());
fromDup.setReceptionTime(toDup.getReceptionTime());
fromDup.setSourceSeqCount(toDup.getSourceSeqCount());
fromDup.setApid(toDup.getApid());
tmDao.insertBatchTmPackets(packetsToStore);
int sleepCount = 0;
// This is the only way to do the test without large Thread sleep in an efficient way
while (sleepCount != 100) {
Thread.sleep(1); // NOSONAR
sleepCount++;
}
List<TmPacketToStore> selectResult = tmDao.getLastTmPacketsByReceptionTime(10);
System.out.println("count inserted " + selectResult.size());
for (TmPacketToStore packet : selectResult) {
System.out.print(packet);
}
assertEquals(expectedPacketsCount, selectResult.size());
}
This exception comes directly from the driver so there's not a lot of wiggle room from morphia's side to manage this. However, the exception does list the index of the failed insert so what can be done is to retry with a new list from just after that index value:
try {
datastore.save(packetsToStore);
} catch (MongoBulkWriteException bulkWriteException) {
BulkWriteError bulkWriteError = bulkWriteException.getWriteErrors().get(0);
datastore.save(packetsToStore.subList(bulkWriteError.getIndex() + 1, packetsToStore.size()));
}
This is something I can consider adding a configuration option for in, say, 3.0 but I can't in a patch version as it's a violation of semver. But if you update your DAO with this kind of logic, you should be all set for now.
This exception comes directly from the driver so there's not a lot of wiggle room from morphia's side to manage this. However, the exception does list the index of the failed insert so what can be done is to retry with a new list from just after that index value:
try { datastore.save(packetsToStore); } catch (MongoBulkWriteException bulkWriteException) { BulkWriteError bulkWriteError = bulkWriteException.getWriteErrors().get(0); datastore.save(packetsToStore.subList(bulkWriteError.getIndex() + 1, packetsToStore.size())); }This is something I can consider adding a configuration option for in, say, 3.0 but I can't in a patch version as it's a violation of semver. But if you update your DAO with this kind of logic, you should be all set for now.
It's something I thought about but it simply meant inserting one by one when a complete list was duplicated, which caused performance problems. As a workaround, it was faster to filter the duplicates and then insert the filtered list
- filter duplicates inside the list based on a unique combination of properties
- if the list is still too big, partition it
- for each partition, check if the unique combination of properties is already in the database with a query with LogicalFilter (fast thanks to compound indexes) and remove them from the list
- bulk insert the filtered list