kafka-connect-mongodb
kafka-connect-mongodb copied to clipboard
Error with shard mongoDB
Hi, we use your connector to send data to a MongoDB. Initially the mongo was installed in ReplicaSet mode on a cluster with 3 node, now it has been changed to Shard mode. After this change we receive the following error
Failed to target upsert by query :: could not extract exact shard key
this is an example of our connector
{
"name": <name>,
"config": {
"connector.class": "at.grahsl.kafka.connect.mongodb.MongoDbSinkConnector",
"key.converter.schemas.enable": "false",
"topics": <nametopic>,
"mongodb.connection.uri": <mongoconnectionurl>
"mongodb.writemodel.strategy": "at.grahsl.kafka.connect.mongodb.writemodel.strategy.UpdateOneTimestampsStrategy",
"value.converter.schemas.enable": "false",
"transforms": "WrapKey",
"max.num.retries":15,
"retries.defer.timeout":60000,
"tasks.max":2,
"name": <...>,
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"mongodb.collection": <name collection>,
"errors.tolerance": "all",
"errors.log.include.messages": "true",
"errors.deadletterqueue.topic.name": <...>,
"errors.deadletterqueue.context.headers.enable": "true",
"transforms.WrapKey.field": "_id",
"transforms.WrapKey.type": "org.apache.kafka.connect.transforms.HoistField$Key",
"key.converter": "org.apache.kafka.connect.storage.StringConverter"
}
}
the shardkey is composed of 3 different values, how can we use this connector to allow writing on the mongo?
UPDATE
we try another strategy
{
"name": "mongodb-connector-coll-event",
"config": {
"connector.class": "at.grahsl.kafka.connect.mongodb.MongoDbSinkConnector",
"topics": "....",
"mongodb.connection.uri": "....",
"max.num.retries":15,
"retries.defer.timeout":60000,
"tasks.max":2,
"name": "...",
"mongodb.collection": "event",
"mongodb.document.id.strategy": "at.grahsl.kafka.connect.mongodb.processor.id.strategy.PartialValueStrategy",
"mongodb.key.projection.list": "Field1,Field2",
"mongodb.key.projection.type": "whitelist",
"mongodb.writemodel.strategy": "at.grahsl.kafka.connect.mongodb.writemodel.strategy.ReplaceOneBusinessKeyStrategy",
"errors.tolerance": "all",
"errors.log.include.messages": "true",
"errors.deadletterqueue.topic.name": "...",
"errors.deadletterqueue.context.headers.enable": "true",
"key.converter": "org.apache.kafka.connect.storage.StringConverter"
}
}
with this code the connector doesn't give us any error but doesn't write any entries to mongodb.
We use this because we need to enter the field "_id" on mongodb so according to the readme https://github.com/hpgrahsl/kafka-connect-mongodb#use-case-1-employing-business-keys this procedure allows it
Thanks