kafka-connect-cosmosdb
kafka-connect-cosmosdb copied to clipboard
Unhandled exception when JSON is not valid
When reading from Cosmos DB Changfeed, if the original document already has a _lsn
property, the ChangeFeedProcessor will add an additional one, creating an invalid JSON document
{
"name": "davide",
"surname": "mauri",
"id": "1",
"_rid": "tA4eAIlHRkMBAAAAAAAAAA==",
"_self": "dbs/tA4eAA==/colls/tA4eAIlHRkM=/docs/tA4eAIlHRkMBAAAAAAAAAA==/",
"_etag": "\"fa019428-0000-0700-0000-5d082aac0000\"",
"_attachments": "attachments/",
"_lsn": 5,
"_metadata": {},
"_ts": 1560816300,
"_lsn": 8,
"_metadata": {}
}
when the document is converted to a JSON, the code crashes. Here's the significant stack trace:
java.lang.IllegalStateException: Unable to parse JSON {[...]}
at com.microsoft.azure.cosmosdb.rx.internal.RxDocumentServiceResponse.fromJson(RxDocumentServiceResponse.java:202)
at com.microsoft.azure.cosmosdb.rx.internal.RxDocumentServiceResponse.getQueryResponse(RxDocumentServiceResponse.java:155)
at com.microsoft.azure.cosmosdb.BridgeInternal.toChaneFeedResponsePage(BridgeInternal.java:74)
at com.microsoft.azure.cosmosdb.rx.internal.ChangeFeedQueryImpl.lambda$executeRequestAsync$2(ChangeFeedQueryImpl.java:154)
sounds like a bug in the CosmosDB ChangeFeed logic. either internally in their Java SDK, or in our implementation of the ChangeFeed consumer.
have you tracked down where this is happening?
Was able to reproduce this outside our connector source code by using this sample application. To do so, add the _lsn
property with an integer value here
However, I am able to get the source and sink connector working together, which should be the original use case behind this issue. The sink connector pushed the following message to Cosmos DB:
{"id": "h1", "HotelName": "Marriott", "Description": "Marriott description"}
and this is how it was added into CosmosDB:
{
"id": "siva-test.documents.azure.com_kafkaconnect_kafka..0",
"_etag": "\"020232ba-0000-0800-0000-5fd395cc0000\"",
"LeaseToken": "0",
"ContinuationToken": "\"2\"",
"timestamp": "2020-12-11T15:52:44.879159Z",
"Owner": "worker0",
"_rid": "hwkqAPqXZvcCAAAAAAAAAA==",
"_self": "dbs/hwkqAA==/colls/hwkqAPqXZvc=/docs/hwkqAPqXZvcCAAAAAAAAAA==/",
"_attachments": "attachments/",
"_ts": 1607701964
}
Looks like the _lsn
property wasn't added automatically.
In the source connector, I get back the following message in the topic:
"{\"Description\":\"Marriott description\",\"id\":\"h1\",\"HotelName\":\"Marriott\",\"_rid\":\"hwkqAK0U44oBAAAAAAAAAA==\",\"_self\":\"dbs/hwkqAA==/colls/hwkqAK0U44o=/docs/hwkqAK0U44oBAAAAAAAAAA==/\",\"_etag\":\"\\\"be007f6a-0000-0800-0000-5fd395be0000\\\"\",\"_attachments\":\"attachments/\",\"_ts\":1607701950,\"_lsn\":2}"
Now if you take that document out of Kafka again with the Sink connector and write it back to Cosmos DB you'll get it in Cosmos DB, with _lsn added ... Then, if you read that out, again, with the Source connector it'll fail because it already has _lsn in it and the Cosmos DB SDK tries to add it again.
A complicated, and unlikely, but not impossible, scenario.
The solution to this would be to remove the _lsn (and other system properties because it'll probably fail anyways when trying to write a doc containing system properties) before writing it back to Cosmos DB using a SMT. You could also configure an SMT to remove all _* properties in the Source connector when reading, before writing to Kafka.
Still, should raise this with Cosmos DB team to see if they can address this in their SDK.
We should also ensure that the Source connector handles this Invalid JSON error correctly. Need to think about what "correctly" is.
Can we, log the error, dead letter this document (if configured) and proceed with the next Document instead of going in to a failed state?
Thanks for your responses Ryan, especially over the weekend. I understand the scenario now and am able to properly reproduce it.
I think it should definitely be possible to remove the _lsn
field using the SMT with the sink connector, so that the data going into Cosmos will never have this problem field. Of course, someone can always create the item with the field manually (when they shouldn't) but that's where the Cosmos folks would need to help us out. For this reason, I went ahead and filed a bug to the SDK repo: https://github.com/Azure/azure-sdk-for-java/issues/18091
Since dead-letter-queues only work for Sink connectors, I don't think that's a viable approach for this issue.
CSE Feedback filed: https://csefy19.visualstudio.com/CSEng/_workitems/edit/414040/
Design decision: filter out system properties
will implement additional code in source connector to filter system properties from the jackson object property bag before passing records to Kafka
exact list of properties still to be defined as some sys properties might actually be useful, like _ts should we drop _ts too? _lsn and _rid etc, are very unlikely to be useful to anyone so we're ok to drop those
suggested work around for now until this is implemented -
use a Single Message Transform (as documented) to filter out properties that you wish to filter.
OR
have Connect output data to a K-SQL stream and then use filtering there to drop system properties before writing to Kafka.
follow pattern done by the Spark 3 connector to config include system properties and timestamps https://github.com/Azure/azure-sdk-for-java/blob/master/sdk/cosmos/azure-cosmos-spark_3-1_2-12/docs/configuration-reference.md