mongo-kafka icon indicating copy to clipboard operation
mongo-kafka copied to clipboard

KAFKA-428: Mark the copy complete in the source offset for the last copied document

Open Calvinnix opened this issue 1 year ago • 1 comments

This fixes an issue that was occurring when:

  1. startup mode COPY_EXISTING is used
  2. the copy has completed
  3. but no new has been sent
  4. then the kafka connector restarts

The resulting behavior of this scenario is that the copy would occur again on restart.


This was happening because sourceOffets for copied records specify copy: true, so the restart thinks that the copy was in progress and it tries again. (note that restarts that occur during a copy is expected to reattempt the copy, which will duplicate data)

Here is an example of what that copy offset looks like:

{"_id":"{\"_id\": {\"$oid\": \"670ee1efaa5a9af80d592c47\"}, \"copyingData\": true}","copy":"true"}

It does look like we have logic which appears to try to "mark copying ended" but this doesn't work when the cachedResult is null.

      // Copying finished - mark copying ended and add cached result
      isCopying = false;
      LOGGER.info("Finished copying existing data from the collection(s).");
      if (cachedResult != null) {
        batch.add(cachedResult);
        cachedResult = null;
      }

(I'm not actually sure if this logic ever worked, but I could see a future where the cachedResult concept is removed. I haven't thought about this enough to form a strong enough opinion yet)


The fix for this issue was to identify when we are creating the sourceOffset for the last copied document and in that case go ahead and "mark copying ended" by removing the copy: true flag and setting the '_id' field to the cachedResumeToken.

Calvinnix avatar Oct 16 '24 16:10 Calvinnix