mongo-kafka
mongo-kafka copied to clipboard
KAFKA-428: Mark the copy complete in the source offset for the last copied document
This fixes an issue that was occurring when:
- startup mode
COPY_EXISTINGis used - the copy has completed
- but no new has been sent
- then the kafka connector restarts
The resulting behavior of this scenario is that the copy would occur again on restart.
This was happening because sourceOffets for copied records specify copy: true, so the restart thinks that the copy was in progress and it tries again. (note that restarts that occur during a copy is expected to reattempt the copy, which will duplicate data)
Here is an example of what that copy offset looks like:
{"_id":"{\"_id\": {\"$oid\": \"670ee1efaa5a9af80d592c47\"}, \"copyingData\": true}","copy":"true"}
It does look like we have logic which appears to try to "mark copying ended" but this doesn't work when the cachedResult is null.
// Copying finished - mark copying ended and add cached result
isCopying = false;
LOGGER.info("Finished copying existing data from the collection(s).");
if (cachedResult != null) {
batch.add(cachedResult);
cachedResult = null;
}
(I'm not actually sure if this logic ever worked, but I could see a future where the cachedResult concept is removed. I haven't thought about this enough to form a strong enough opinion yet)
The fix for this issue was to identify when we are creating the sourceOffset for the last copied document and in that case go ahead and "mark copying ended" by removing the copy: true flag and setting the '_id' field to the cachedResumeToken.