mongo-kafka
mongo-kafka copied to clipboard
Enable MongoDB sink with unique fields in MongoDB Sharded cluster
Updates:
- Create a new toggle (
isUseFilterInValueDoc) in theMongoDbUpdateto enable using thefilterfield provided by Debezium. - Create a new class (
MongoDbUniqueFieldHandler) to provide default operations with theisUseFilterInValueDocenabled. - Update
MongoDbHandlerandChangeStreamHandlerto adjust default operations with theisUseFilterInValueDocdisabled. - Add some new unit tests.
Problem Statements
Our use case is to migrate data to a new MongoDB Sharded cluster. Our approach is to use CDC (Debezium). When replicating data to the new cluster, we got an error regarding the shard key with the following details.
Failed to target upsert by query :: could not extract exact shard key
The current situation is that we have unique fields enabled for some collections in our Sharded cluster. We also use the same unique fields in our new Sharded cluster. The error happened because the MongoDB Sink connector doesn't provide the unique fields information when upserting data.
Solution
As you may know, Debezium provides a filter field in its payload, as shown below.
{
"after": null,
"patch": "{\"$v\": 1,\"$set\": {\"updated_at\": {\"$date\": 1678606630819}}}",
"filter": "{\"transaction_id\": {\"$numberLong\": \"1234\"},\"_id\": {\"$oid\": \"63a3edd806add30266cb831f\"}}"
}
Thus, we want to use the filter information as a filter when upserting data to solve our problem.