kafka-connect-cosmosdb icon indicating copy to clipboard operation
kafka-connect-cosmosdb copied to clipboard

$id strategies do not appear to be working correctly.

Open ChrisWestermann opened this issue 2 years ago • 7 comments

Description

I have tried and read nearly everything I can find but I cannot sink messages produced from the Debeznium SQL Server Connector (debezium-connector-sqlserver:1.6.0) which cause an error that the JsonPath for $.id cannot be evaluated.

I have ensured the SQL table uses id as its primary key and is lower case.

I have tried "ProvidedInValueStrategy" and "ProvidedInKeyStrategy" as well as the default "FullKeyStrategy" ID strategies.

I can sink messages generated by the CosmosDB Source connector, but any attempt to sink from SQL Server connector fails

It may boil down to improper configuration and I suspect it is how the SQL Server connector generates its message structure but I cannot identify how or what to do to fix this.

Expected Behavior

The CosmosDB Sink Connector should be able to consume messages produced from the Debezium SQL Server Connector.

Reproduce

SQL Server table create script

CREATE TABLE tenants (
    id INT PRIMARY KEY IDENTITY (1, 1),
    [name] text NOT NULL,
    customer_id UNIQUEIDENTIFIER NOT NULL
);
GO

EXEC sys.sp_cdc_enable_table  
@source_schema = N'dbo',  
@source_name   = N'tenants',  
@role_name     = NULL,  
@supports_net_changes = 1  
GO 

Sink Connector configured as and successfully starts with evaluated parameters

    {
        "connector.class": "com.azure.cosmos.kafka.connect.sink.CosmosDBSinkConnector",
        "tasks.max": "1",
        "topics": "$TOPICS",
        "input.data.format": "JSON",
        "key.converter": "org.apache.kafka.connect.json.JsonConverter",
        "key.converter.schemas.enable": "false",
        "key.converter.schema.registry.url": "http://schema-registry:8081",
        "value.converter": "org.apache.kafka.connect.json.JsonConverter",
        "value.converter.schemas.enable": "false",
        "value.converter.schema.registry.url": "http://schema-registry:8081",
        "connect.cosmos.connection.endpoint": "https://${TDM_COSMOS_NAME}.documents.azure.com:443/",
        "connect.cosmos.master.key": "$TDM_COSMOS_MKEY",
        "connect.cosmos.databasename": "$DBNAME",
        "connect.cosmos.containers.topicmap": "$MAP",
        "connect.cosmos.offset.useLatest": false,
        "cosmos.id.strategy": "ProvidedInValueStrategy" -- or InKeyStrategy or FullKeyStrategy
    }

I have ensured that the SQL Server connector is also configured to use JsonConverter with schemas disabled.

Topic Message Key Produced - even with ProvidedInKeyStrategy, same error even though it only contains "id" field.

{ 
  "id": 1 
}

Topic Message Value Produced is - I know there is no root id but this is what Debezium SQL Source Connector produces.

{
 "before": null,
 "after": {
   "id": 1,
   "name": "tenant",
   "customer_id": "3189BE51-22A3-451E-A778-01C10439B769"
 },
 "source": {
   "version": "1.6.0.Final",
   "connector": "sqlserver",
   "name": "mssql",
   "ts_ms": 1631883768203,
   "snapshot": "false",
   "db": "tenancy",
   "sequence": null,
   "schema": "dbo",
   "table": "tenants",
   "change_lsn": "00000025:00000670:0037",
   "commit_lsn": "00000025:00000670:0038",
   "event_serial_no": 1
 },
 "op": "c",
 "ts_ms": 1631883773104,
 "transaction": null
}

Resulting Exception

ERROR WorkerSinkTask{id=tpmconnect-sink-connector-tenancy-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted. Error: Could not evaluate JsonPath $.id (org.apache.kafka.connect.runtime.WorkerSinkTask)

org.apache.kafka.connect.errors.ConnectException: Could not evaluate JsonPath $.id

at com.azure.cosmos.kafka.connect.sink.id.strategy.ProvidedInStrategy.generateId(ProvidedInStrategy.java:35)

at com.azure.cosmos.kafka.connect.sink.id.strategy.ProvidedInValueStrategy.generateId(ProvidedInValueStrategy.java:3)

at com.azure.cosmos.kafka.connect.sink.CosmosDBSinkTask.maybeInsertId(CosmosDBSinkTask.java:99)

at com.azure.cosmos.kafka.connect.sink.CosmosDBSinkTask.put(CosmosDBSinkTask.java:81)

at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:581)

at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:329)

at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:232)

at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:201)

at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:182)

at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:231)

at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)

at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)

at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)

at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)

at java.base/java.lang.Thread.run(Thread.java:829)

Caused by: com.jayway.jsonpath.PathNotFoundException: No results for path: $['id']

at com.jayway.jsonpath.internal.path.EvaluationContextImpl.getValue(EvaluationContextImpl.java:133)

at com.jayway.jsonpath.JsonPath.read(JsonPath.java:185)

at com.jayway.jsonpath.internal.JsonContext.read(JsonContext.java:89)

at com.jayway.jsonpath.internal.JsonContext.read(JsonContext.java:78)

at com.azure.cosmos.kafka.connect.sink.id.strategy.ProvidedInStrategy.generateId(ProvidedInStrategy.java:32)

... 14 more

[2021-09-17 13:06:54,351] ERROR WorkerSinkTask{id=tpmconnect-sink-connector-tenancy-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask)

org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.

at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:609)

at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:329)

at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:232)

at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:201)

at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:182)

at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:231)

at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)

at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)

at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)

at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)

at java.base/java.lang.Thread.run(Thread.java:829)

Caused by: org.apache.kafka.connect.errors.ConnectException: Could not evaluate JsonPath $.id

at com.azure.cosmos.kafka.connect.sink.id.strategy.ProvidedInStrategy.generateId(ProvidedInStrategy.java:35)

at com.azure.cosmos.kafka.connect.sink.id.strategy.ProvidedInValueStrategy.generateId(ProvidedInValueStrategy.java:3)

at com.azure.cosmos.kafka.connect.sink.CosmosDBSinkTask.maybeInsertId(CosmosDBSinkTask.java:99)

at com.azure.cosmos.kafka.connect.sink.CosmosDBSinkTask.put(CosmosDBSinkTask.java:81)

at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:581)

... 10 more

Caused by: com.jayway.jsonpath.PathNotFoundException: No results for path: $['id']

at com.jayway.jsonpath.internal.path.EvaluationContextImpl.getValue(EvaluationContextImpl.java:133)

at com.jayway.jsonpath.JsonPath.read(JsonPath.java:185)

at com.jayway.jsonpath.internal.JsonContext.read(JsonContext.java:89)

at com.jayway.jsonpath.internal.JsonContext.read(JsonContext.java:78)

at com.azure.cosmos.kafka.connect.sink.id.strategy.ProvidedInStrategy.generateId(ProvidedInStrategy.java:32)

... 14 more

ChrisWestermann avatar Sep 17 '21 13:09 ChrisWestermann