snowflake-kafka-connector icon indicating copy to clipboard operation
snowflake-kafka-connector copied to clipboard

Filtering records in the connector or ignoring tombstones

Open Stormhand opened this issue 4 years ago • 7 comments

Are there any plans to support the Kafka Connect transformations in the connector? We have some kafka topics where 75% of the messages are a Tombstone records which are actually just null records comming from a CDC source. I'm looking for a solution to filter them because in the VARIANT column in the staging table they are transformed as an empty JSON records. This has a big performance impact on a 35mln set of data where 15mln are null's when i have to ingest them because the table scan time and disk spilling is the same as non-null records.

Thanks!

Stormhand avatar Feb 09 '21 14:02 Stormhand

If this connector is by chance Debezium (guessing because you mentioned CDC and we hit the same issue), they've got a tombstones.on.delete option to avoid emitting tombstones depending on your source.

dhuang avatar Feb 16 '21 17:02 dhuang

No, Im using Maxwell for CDC but the data is aggregated by Flink before the Snowflake Connector to clean duplicates. However a filtering mechanism in the connector would be good.

Stormhand avatar Feb 17 '21 08:02 Stormhand

Any dev attention here? I think that a message which contains only a NULL without any other information should be ignored even without explicit filter. There is no way one to use them in any way.

Stormhand avatar Feb 24 '21 12:02 Stormhand

I do not think that messages with a null value should be thrown away, as tombstone messages are very widely used in Kafka topics. They are not completely empty, as they still can have a key.

For example, when performing CDC such a message could be used to delete a row in a downstream table (using Snowflake streams and tasks on the table created by the Snowflake Kafka connecotr). Actually, this is a very important use case for our project.

But I also think that returning an empty JSON record as the RECORD_CONTENT is perhaps not the right way, but that a null value would be more approriate.

What keeps us from setting content[0] = NullNode.getInstance(), i.e., to set the record content to a JSON null value, in the no-args constructor of SnowflakeRecordContent?

anekdoti avatar Apr 19 '21 12:04 anekdoti

I do not think that messages with a null value should be thrown away, as tombstone messages are very widely used in Kafka topics. They are not completely empty, as they still can have a key.

For example, when performing CDC such a message could be used to delete a row in a downstream table (using Snowflake streams and tasks on the table created by the Snowflake Kafka connecotr). Actually, this is a very important use case for our project.

But I also think that returning an empty JSON record as the RECORD_CONTENT is perhaps not the right way, but that a null value would be more approriate.

What keeps us from setting content[0] = NullNode.getInstance(), i.e., to set the record content to a JSON null value, in the no-args constructor of SnowflakeRecordContent?

Do you have a working query applying CDC changes in snowflake? Because that would be extremely expensive here but you are right at least it shouldn't be converted to an empty JSON which by the way i managed to find only by comparing it as a string: where RECORD_CONTENT!='{}'

Stormhand avatar Apr 19 '21 13:04 Stormhand

What do you mean with expensive in your specific case, and what is the main factor for it? How many changes do you have to handle per second?

Indeed, we have a CDC pipeline (currently just insert + update), using Snowflake streams and tasks - I currently have no real performance data, but at least I did not observe anything negative.

anekdoti avatar Apr 19 '21 16:04 anekdoti

Well, consider im applying 10million changes hourly. I might have thousands changes for a single unique id and half of them might be nulls. So i need only the last update, i dont care about the deletes. The merge operation for example would take the same time if there are real updates and empty json records. By expensive i mean both credits and time because of the delete operations where later i might have to insert/update the same uniqe id which is deleted by a tombstone. P.S. An option in the connector config is the best option i think.

Stormhand avatar Apr 19 '21 16:04 Stormhand

Closing out issue due to age of issue - please reopen if this is still a concern

sfc-gh-rcheng avatar Jul 31 '23 19:07 sfc-gh-rcheng

This should filter tombstones and fix the issue:

"transforms": "FilterEmptyMessages",
"transforms.FilterEmptyMessages.type": "org.apache.kafka.connect.transforms.Filter",
"transforms.FilterEmptyMessages.predicate": "FilterEmpty",
"predicates": "FilterEmpty",
"predicates.FilterEmpty.type": "org.apache.kafka.connect.transforms.predicates.RecordIsTombstone"

vascoferraz avatar Nov 16 '23 12:11 vascoferraz

Tombstones are also enabled with this commit after v2.1.0 of kc for snowpipe streaming!

sfc-gh-rcheng avatar Nov 16 '23 17:11 sfc-gh-rcheng