kafka-connect-cosmosdb
kafka-connect-cosmosdb copied to clipboard
$id strategies do not appear to be working correctly.
Description
I have tried and read nearly everything I can find but I cannot sink messages produced from the Debeznium SQL Server Connector (debezium-connector-sqlserver:1.6.0) which cause an error that the JsonPath for $.id cannot be evaluated.
I have ensured the SQL table uses id as its primary key and is lower case.
I have tried "ProvidedInValueStrategy" and "ProvidedInKeyStrategy" as well as the default "FullKeyStrategy" ID strategies.
I can sink messages generated by the CosmosDB Source connector, but any attempt to sink from SQL Server connector fails
It may boil down to improper configuration and I suspect it is how the SQL Server connector generates its message structure but I cannot identify how or what to do to fix this.
Expected Behavior
The CosmosDB Sink Connector should be able to consume messages produced from the Debezium SQL Server Connector.
Reproduce
SQL Server table create script
CREATE TABLE tenants (
id INT PRIMARY KEY IDENTITY (1, 1),
[name] text NOT NULL,
customer_id UNIQUEIDENTIFIER NOT NULL
);
GO
EXEC sys.sp_cdc_enable_table
@source_schema = N'dbo',
@source_name = N'tenants',
@role_name = NULL,
@supports_net_changes = 1
GO
Sink Connector configured as and successfully starts with evaluated parameters
{
"connector.class": "com.azure.cosmos.kafka.connect.sink.CosmosDBSinkConnector",
"tasks.max": "1",
"topics": "$TOPICS",
"input.data.format": "JSON",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enable": "false",
"key.converter.schema.registry.url": "http://schema-registry:8081",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "false",
"value.converter.schema.registry.url": "http://schema-registry:8081",
"connect.cosmos.connection.endpoint": "https://${TDM_COSMOS_NAME}.documents.azure.com:443/",
"connect.cosmos.master.key": "$TDM_COSMOS_MKEY",
"connect.cosmos.databasename": "$DBNAME",
"connect.cosmos.containers.topicmap": "$MAP",
"connect.cosmos.offset.useLatest": false,
"cosmos.id.strategy": "ProvidedInValueStrategy" -- or InKeyStrategy or FullKeyStrategy
}
I have ensured that the SQL Server connector is also configured to use JsonConverter with schemas disabled.
Topic Message Key Produced - even with ProvidedInKeyStrategy, same error even though it only contains "id" field.
{
"id": 1
}
Topic Message Value Produced is - I know there is no root id but this is what Debezium SQL Source Connector produces.
{
"before": null,
"after": {
"id": 1,
"name": "tenant",
"customer_id": "3189BE51-22A3-451E-A778-01C10439B769"
},
"source": {
"version": "1.6.0.Final",
"connector": "sqlserver",
"name": "mssql",
"ts_ms": 1631883768203,
"snapshot": "false",
"db": "tenancy",
"sequence": null,
"schema": "dbo",
"table": "tenants",
"change_lsn": "00000025:00000670:0037",
"commit_lsn": "00000025:00000670:0038",
"event_serial_no": 1
},
"op": "c",
"ts_ms": 1631883773104,
"transaction": null
}
Resulting Exception
ERROR WorkerSinkTask{id=tpmconnect-sink-connector-tenancy-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted. Error: Could not evaluate JsonPath $.id (org.apache.kafka.connect.runtime.WorkerSinkTask)
org.apache.kafka.connect.errors.ConnectException: Could not evaluate JsonPath $.id
at com.azure.cosmos.kafka.connect.sink.id.strategy.ProvidedInStrategy.generateId(ProvidedInStrategy.java:35)
at com.azure.cosmos.kafka.connect.sink.id.strategy.ProvidedInValueStrategy.generateId(ProvidedInValueStrategy.java:3)
at com.azure.cosmos.kafka.connect.sink.CosmosDBSinkTask.maybeInsertId(CosmosDBSinkTask.java:99)
at com.azure.cosmos.kafka.connect.sink.CosmosDBSinkTask.put(CosmosDBSinkTask.java:81)
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:581)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:329)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:232)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:201)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:182)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:231)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: com.jayway.jsonpath.PathNotFoundException: No results for path: $['id']
at com.jayway.jsonpath.internal.path.EvaluationContextImpl.getValue(EvaluationContextImpl.java:133)
at com.jayway.jsonpath.JsonPath.read(JsonPath.java:185)
at com.jayway.jsonpath.internal.JsonContext.read(JsonContext.java:89)
at com.jayway.jsonpath.internal.JsonContext.read(JsonContext.java:78)
at com.azure.cosmos.kafka.connect.sink.id.strategy.ProvidedInStrategy.generateId(ProvidedInStrategy.java:32)
... 14 more
[2021-09-17 13:06:54,351] ERROR WorkerSinkTask{id=tpmconnect-sink-connector-tenancy-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask)
org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:609)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:329)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:232)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:201)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:182)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:231)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: org.apache.kafka.connect.errors.ConnectException: Could not evaluate JsonPath $.id
at com.azure.cosmos.kafka.connect.sink.id.strategy.ProvidedInStrategy.generateId(ProvidedInStrategy.java:35)
at com.azure.cosmos.kafka.connect.sink.id.strategy.ProvidedInValueStrategy.generateId(ProvidedInValueStrategy.java:3)
at com.azure.cosmos.kafka.connect.sink.CosmosDBSinkTask.maybeInsertId(CosmosDBSinkTask.java:99)
at com.azure.cosmos.kafka.connect.sink.CosmosDBSinkTask.put(CosmosDBSinkTask.java:81)
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:581)
... 10 more
Caused by: com.jayway.jsonpath.PathNotFoundException: No results for path: $['id']
at com.jayway.jsonpath.internal.path.EvaluationContextImpl.getValue(EvaluationContextImpl.java:133)
at com.jayway.jsonpath.JsonPath.read(JsonPath.java:185)
at com.jayway.jsonpath.internal.JsonContext.read(JsonContext.java:89)
at com.jayway.jsonpath.internal.JsonContext.read(JsonContext.java:78)
at com.azure.cosmos.kafka.connect.sink.id.strategy.ProvidedInStrategy.generateId(ProvidedInStrategy.java:32)
... 14 more
Thank you for bringing this to our attention. when you set your path for the id field, what did you set it to? it looks to me like you've set the path to be $["id"] which says to look for a field called "id" in the root of the document.
If my understanding of your document is correct your id fields exists elsewhere in your document
{
"before": null,
"after": {
"id": 1,
......
}
}
Cosmos DB will want an $id field at the root of the document. There is no way around that.
So you some options here -
-
Keep using the ProvidedInValueStrategy and set the path to your id field as $["after\id"] so we know where to go get the id. The connector should then take the value of that field and create a new "id" field in the root with that value. Leaving you with two id fields.
-
When writing the document to Kafka, set the key as the value for your id field, and then use the InKeyStrategy. The connector will then read the value of the key and insert that as a "id" field in to the root of your document. Leaving you with two id fields.
-
Use InsertField or InsertUUID SMT to create a new "id" field in the root and set it to anything you like. Leaving you with two id fields.
-
Use an SMT to read the value of a json path and chain on an InsertField to create a new id field in the root and set it to this value. Leaving you with two id fields again.
You could then use a SMT to remove the original id field from the document if you wish. But you can also leave it there. Cosmos DB won't care about it, it only looks for a $id field in the root.
Let me know if this helps.
Thank you for your response.
It seems no ID strategies I try are working and nothing I configure is changing the final result. Is this feature working in 1.1.0?
For instance, when I select ProvidedInKeyStrategy and I know the message key (see above) contains an "id" value is in the root of the JSON key, same exception. If I try FullKeyStrategy, the same exception, also with KafkaMetadataStrategy. It is almost like the configuration values don't matter or are read in by the connector, it just always assumes to look for "id" JsonPath in the value.
Also, if I try to set up the Cosmos DB Sink Connector manually in Kafka Control Center, there are no strategies listed in the drop-down field and it won't accept anything I type in, and if I upload a JSON configuration file it will say the "ProvidedInKeyStrategy" class is not found so I can't actually launch the connector through Control Center with any ID strategy other then the default.
Can you provide an example of what the JSON config should look like using ProvidedInKeyStrategy and the JsonPath value? I am not sure if I use "connect.comsos.id.strategy", "cosmos.id.strategy" or just "id.strategy" and "id.strategy.jsonpath" seems to not be documented anywhere other then in PR's and discussion boards.
I change direction and try using SMT and inserting ID directly into the root of the message value, however, I don't think the ID strategy feature is actually working as expected.
Just a follow-up if I use SMT to insert an "id" field into the message value JSON payload at the root then the Sink connector works, however, any attempt I have made to use any documented ID strategies to read the "id" value out of the complex JSON path is still not working for me.
I discovered that the Debezium SQL Connector has an SMT to flatten the message payload which will take the contents of "after" and move it up to the root of the message. This will now include the "id" field with the value I expect in the root which will allow the Cosmos DB Connector Sink to work as intended.
I can proceed with this workaround however I am still very interested in how to solve this issue, whether it ends up being an improperly configured Sink connector or an actual bug with the ID strategy feature.
Thank You for your time.
Thanks Chris. Glad you have found a suitable workaround now using SMTs. We will need to take a look at the strategies to ensure they are working as expected.
ProvidedInValueStrategy & InKeyStrategy do not appear to be working as expected. We need to be able to configure a path to the id field, in both the key & value. When configured we should be extracting the value from that path and creating a new $id field at the root of the document.
Hello!
Any update?
When I execute the connector I get an exception when the message doesn't contain a id. I tried several configs but any of them worked.
Thanks.
"trace": "org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:615)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:330)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:232)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:201)\n\tat org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:191)\n\tat org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:240)\n\tat java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)\n\tat java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)\n\tat java.base/java.lang.Thread.run(Thread.java:829)\nCaused by: org.apache.kafka.connect.errors.ConnectException: Could not evaluate JsonPath $.id\n\tat com.azure.cosmos.kafka.connect.sink.id.strategy.ProvidedInStrategy.generateId(ProvidedInStrategy.java:35)\n\tat com.azure.cosmos.kafka.connect.sink.id.strategy.ProvidedInValueStrategy.generateId(ProvidedInValueStrategy.java:3)\n\tat com.azure.cosmos.kafka.connect.sink.CosmosDBSinkTask.maybeInsertId(CosmosDBSinkTask.java:181)\n\tat com.azure.cosmos.kafka.connect.sink.CosmosDBSinkTask.put(CosmosDBSinkTask.java:118)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:587)\n\t... 10 more\nCaused by: com.jayway.jsonpath.PathNotFoundException: No results for path: $['id']\n\tat com.jayway.jsonpath.internal.path.EvaluationContextImpl.getValue(EvaluationContextImpl.java:133)\n\tat com.jayway.jsonpath.JsonPath.read(JsonPath.java:185)\n\tat com.jayway.jsonpath.internal.JsonContext.read(JsonContext.java:89)\n\tat com.jayway.jsonpath.internal.JsonContext.read(JsonContext.java:78)\n\tat com.azure.cosmos.kafka.connect.sink.id.strategy.ProvidedInStrategy.generateId(ProvidedInStrategy.java:32)\n\t... 14 more\n"