ksql
ksql copied to clipboard
KSQL should warn users if they specify non-StringConverter for keys when creating connectors
Since KSQL only supports string keys (currently), it should ensure that users have a 'happy path' using Connect.
✅ It already uses StringConverter if the user doesn't specify a key.converter in their CREATE CONNECTOR statement
⚠️ If a user specifies a key.converter of Avro then KSQL won't be able to use the message key which will cause lots of problems for joins etc as is well known. It is likely that users will copy and paste configuration in from existing connectors or examples that they're found, which may well include key.converter
💡 Suggestion: parse the CREATE CONNECTOR input and if the user has specified key.converter which is not StringConverter issue a warning (or even fail the statement? how aggressive do we want to be?)
When I created a JDBC connector the StringConverter was set automagically, but for the Debezium connector it's not doing.
So, if it doesn't set it automagically, then it should :)
Hmm this is maybe not as straightforward as I thought. For Debezium with StringConverter the key ends up as Struct{id=43}
10/11/19 11:58:50 AM UTC, Struct{id=43}, {"id": 43, "first_name": "Rick", "last_name": "Astley", "email": null, "gender": null, "club_status": null, "comments": null, "create_ts": "2019-10-11T11:58:49Z", "update_ts": "2019-10-11T11:58:49Z", "messagetopic": "dbz_asgard.demo.CUSTOMERS", "messagesource": "Debezium CDC from MySQL on asgard"}
So Debezium, unlike the JDBC source connector, sets the message key itself based on the PK of the source table. But letting it do that with 'key.converter'= 'org.apache.kafka.connect.storage.StringConverter', results in a string key of Struct{id=43} rather than 43 which KSQL needs.
So counter-intuitively, user has to add the SMT themself because KSQL doens't?
'transforms.ksqlCreateKey.type'= 'org.apache.kafka.connect.transforms.ValueToKey',
'transforms.ksqlCreateKey.fields'= 'id',
'transforms.ksqlExtractString.field'= 'id',
'transforms.ksqlExtractString.type'= 'org.apache.kafka.connect.transforms.ExtractField$Key');