ksql icon indicating copy to clipboard operation
ksql copied to clipboard

KSQL should warn users if they specify non-StringConverter for keys when creating connectors

Open rmoff opened this issue 6 years ago • 3 comments

Since KSQL only supports string keys (currently), it should ensure that users have a 'happy path' using Connect.

✅ It already uses StringConverter if the user doesn't specify a key.converter in their CREATE CONNECTOR statement ⚠️ If a user specifies a key.converter of Avro then KSQL won't be able to use the message key which will cause lots of problems for joins etc as is well known. It is likely that users will copy and paste configuration in from existing connectors or examples that they're found, which may well include key.converter

💡 Suggestion: parse the CREATE CONNECTOR input and if the user has specified key.converter which is not StringConverter issue a warning (or even fail the statement? how aggressive do we want to be?)

rmoff avatar Oct 11 '19 11:10 rmoff

When I created a JDBC connector the StringConverter was set automagically, but for the Debezium connector it's not doing.

So, if it doesn't set it automagically, then it should :)

rmoff avatar Oct 11 '19 11:10 rmoff

Hmm this is maybe not as straightforward as I thought. For Debezium with StringConverter the key ends up as Struct{id=43}

10/11/19 11:58:50 AM UTC, Struct{id=43}, {"id": 43, "first_name": "Rick", "last_name": "Astley", "email": null, "gender": null, "club_status": null, "comments": null, "create_ts": "2019-10-11T11:58:49Z", "update_ts": "2019-10-11T11:58:49Z", "messagetopic": "dbz_asgard.demo.CUSTOMERS", "messagesource": "Debezium CDC from MySQL on asgard"}

rmoff avatar Oct 11 '19 12:10 rmoff

So Debezium, unlike the JDBC source connector, sets the message key itself based on the PK of the source table. But letting it do that with 'key.converter'= 'org.apache.kafka.connect.storage.StringConverter', results in a string key of Struct{id=43} rather than 43 which KSQL needs.

So counter-intuitively, user has to add the SMT themself because KSQL doens't?

            'transforms.ksqlCreateKey.type'= 'org.apache.kafka.connect.transforms.ValueToKey',
            'transforms.ksqlCreateKey.fields'= 'id',
            'transforms.ksqlExtractString.field'= 'id',
            'transforms.ksqlExtractString.type'= 'org.apache.kafka.connect.transforms.ExtractField$Key');

rmoff avatar Oct 11 '19 12:10 rmoff