mongo-kafka icon indicating copy to clipboard operation
mongo-kafka copied to clipboard

Enable MongoDB sink with unique fields in MongoDB Sharded cluster

Open davidch93 opened this issue 2 years ago • 0 comments

Updates:

  • Create a new toggle (isUseFilterInValueDoc) in the MongoDbUpdate to enable using the filter field provided by Debezium.
  • Create a new class (MongoDbUniqueFieldHandler) to provide default operations with the isUseFilterInValueDoc enabled.
  • Update MongoDbHandler and ChangeStreamHandler to adjust default operations with the isUseFilterInValueDoc disabled.
  • Add some new unit tests.

Problem Statements

Our use case is to migrate data to a new MongoDB Sharded cluster. Our approach is to use CDC (Debezium). When replicating data to the new cluster, we got an error regarding the shard key with the following details.

Failed to target upsert by query :: could not extract exact shard key

The current situation is that we have unique fields enabled for some collections in our Sharded cluster. We also use the same unique fields in our new Sharded cluster. The error happened because the MongoDB Sink connector doesn't provide the unique fields information when upserting data.

Solution

As you may know, Debezium provides a filter field in its payload, as shown below.

{
  "after": null,
  "patch": "{\"$v\": 1,\"$set\": {\"updated_at\": {\"$date\": 1678606630819}}}",
  "filter": "{\"transaction_id\": {\"$numberLong\": \"1234\"},\"_id\": {\"$oid\": \"63a3edd806add30266cb831f\"}}"
}

Thus, we want to use the filter information as a filter when upserting data to solve our problem.

davidch93 avatar Mar 15 '23 10:03 davidch93