kafka-connect-cosmosdb icon indicating copy to clipboard operation
kafka-connect-cosmosdb copied to clipboard

Unhandled exception when JSON is not valid

Open yorek opened this issue 5 years ago • 11 comments

When reading from Cosmos DB Changfeed, if the original document already has a _lsn property, the ChangeFeedProcessor will add an additional one, creating an invalid JSON document

{
	"name": "davide",
	"surname": "mauri",
	"id": "1",
	"_rid": "tA4eAIlHRkMBAAAAAAAAAA==",
	"_self": "dbs/tA4eAA==/colls/tA4eAIlHRkM=/docs/tA4eAIlHRkMBAAAAAAAAAA==/",
	"_etag": "\"fa019428-0000-0700-0000-5d082aac0000\"",
	"_attachments": "attachments/",
	"_lsn": 5,
	"_metadata": {},
	"_ts": 1560816300,
	"_lsn": 8,
	"_metadata": {}
}

when the document is converted to a JSON, the code crashes. Here's the significant stack trace:

java.lang.IllegalStateException: Unable to parse JSON {[...]}
	at com.microsoft.azure.cosmosdb.rx.internal.RxDocumentServiceResponse.fromJson(RxDocumentServiceResponse.java:202)
	at com.microsoft.azure.cosmosdb.rx.internal.RxDocumentServiceResponse.getQueryResponse(RxDocumentServiceResponse.java:155)
	at com.microsoft.azure.cosmosdb.BridgeInternal.toChaneFeedResponsePage(BridgeInternal.java:74)
	at com.microsoft.azure.cosmosdb.rx.internal.ChangeFeedQueryImpl.lambda$executeRequestAsync$2(ChangeFeedQueryImpl.java:154)

yorek avatar Jun 18 '19 00:06 yorek

sounds like a bug in the CosmosDB ChangeFeed logic. either internally in their Java SDK, or in our implementation of the ChangeFeed consumer.

have you tracked down where this is happening?

ryancrawcour avatar Jun 18 '19 20:06 ryancrawcour

Was able to reproduce this outside our connector source code by using this sample application. To do so, add the _lsn property with an integer value here

sivamu avatar Dec 11 '20 15:12 sivamu

However, I am able to get the source and sink connector working together, which should be the original use case behind this issue. The sink connector pushed the following message to Cosmos DB: {"id": "h1", "HotelName": "Marriott", "Description": "Marriott description"}

and this is how it was added into CosmosDB:

{
    "id": "siva-test.documents.azure.com_kafkaconnect_kafka..0",
    "_etag": "\"020232ba-0000-0800-0000-5fd395cc0000\"",
    "LeaseToken": "0",
    "ContinuationToken": "\"2\"",
    "timestamp": "2020-12-11T15:52:44.879159Z",
    "Owner": "worker0",
    "_rid": "hwkqAPqXZvcCAAAAAAAAAA==",
    "_self": "dbs/hwkqAA==/colls/hwkqAPqXZvc=/docs/hwkqAPqXZvcCAAAAAAAAAA==/",
    "_attachments": "attachments/",
    "_ts": 1607701964
}

Looks like the _lsn property wasn't added automatically.

In the source connector, I get back the following message in the topic:

"{\"Description\":\"Marriott description\",\"id\":\"h1\",\"HotelName\":\"Marriott\",\"_rid\":\"hwkqAK0U44oBAAAAAAAAAA==\",\"_self\":\"dbs/hwkqAA==/colls/hwkqAK0U44o=/docs/hwkqAK0U44oBAAAAAAAAAA==/\",\"_etag\":\"\\\"be007f6a-0000-0800-0000-5fd395be0000\\\"\",\"_attachments\":\"attachments/\",\"_ts\":1607701950,\"_lsn\":2}"

sivamu avatar Dec 11 '20 16:12 sivamu

Now if you take that document out of Kafka again with the Sink connector and write it back to Cosmos DB you'll get it in Cosmos DB, with _lsn added ... Then, if you read that out, again, with the Source connector it'll fail because it already has _lsn in it and the Cosmos DB SDK tries to add it again.

A complicated, and unlikely, but not impossible, scenario.

The solution to this would be to remove the _lsn (and other system properties because it'll probably fail anyways when trying to write a doc containing system properties) before writing it back to Cosmos DB using a SMT. You could also configure an SMT to remove all _* properties in the Source connector when reading, before writing to Kafka.

Still, should raise this with Cosmos DB team to see if they can address this in their SDK.

ryancrawcour avatar Dec 11 '20 18:12 ryancrawcour

We should also ensure that the Source connector handles this Invalid JSON error correctly. Need to think about what "correctly" is.

Can we, log the error, dead letter this document (if configured) and proceed with the next Document instead of going in to a failed state?

ryancrawcour avatar Dec 11 '20 18:12 ryancrawcour

Thanks for your responses Ryan, especially over the weekend. I understand the scenario now and am able to properly reproduce it.

I think it should definitely be possible to remove the _lsn field using the SMT with the sink connector, so that the data going into Cosmos will never have this problem field. Of course, someone can always create the item with the field manually (when they shouldn't) but that's where the Cosmos folks would need to help us out. For this reason, I went ahead and filed a bug to the SDK repo: https://github.com/Azure/azure-sdk-for-java/issues/18091

Since dead-letter-queues only work for Sink connectors, I don't think that's a viable approach for this issue.

sivamu avatar Dec 11 '20 20:12 sivamu

CSE Feedback filed: https://csefy19.visualstudio.com/CSEng/_workitems/edit/414040/

sivamu avatar Dec 16 '20 16:12 sivamu

Design decision: filter out system properties

brandynbrown avatar Feb 18 '21 20:02 brandynbrown

will implement additional code in source connector to filter system properties from the jackson object property bag before passing records to Kafka

exact list of properties still to be defined as some sys properties might actually be useful, like _ts should we drop _ts too? _lsn and _rid etc, are very unlikely to be useful to anyone so we're ok to drop those

ryancrawcour avatar Feb 18 '21 21:02 ryancrawcour

suggested work around for now until this is implemented -

use a Single Message Transform (as documented) to filter out properties that you wish to filter.

OR

have Connect output data to a K-SQL stream and then use filtering there to drop system properties before writing to Kafka.

ryancrawcour avatar Feb 18 '21 21:02 ryancrawcour

follow pattern done by the Spark 3 connector to config include system properties and timestamps https://github.com/Azure/azure-sdk-for-java/blob/master/sdk/cosmos/azure-cosmos-spark_3-1_2-12/docs/configuration-reference.md

ryancrawcour avatar May 24 '21 22:05 ryancrawcour