[mysql][oracle] Quote primary key when split snapshot
MySqlSnapshotSplitReadTask throw exception when primary key conflict with keywords or reserved words.
sample table:
CREATE TABLE quote_pk_table (
`key` VARCHAR(10) NOT NULL PRIMARY KEY,
name VARCHAR(255) NOT NULL DEFAULT 'flink',
address VARCHAR(1024),
phone_number INTEGER DEFAULT ' 123 '
);
exception:
682590 [debezium-reader-2] ERROR com.ververica.cdc.connectors.mysql.debezium.reader.SnapshotSplitReader - Execute snapshot read task for mysql split MySqlSnapshotSplit{tableId=customer_1wfwad8.quote_pk_table, splitId='customer_1wfwad8.quote_pk_table:2', splitKeyType=[`key` VARCHAR(10) NOT NULL], splitStart=[10006], splitEnd=[10009], highWatermark=null} fail
io.debezium.DebeziumException: org.apache.kafka.connect.errors.ConnectException: Snapshotting of table customer_1wfwad8.quote_pk_table failed
at com.ververica.cdc.connectors.mysql.debezium.task.MySqlSnapshotSplitReadTask.execute(MySqlSnapshotSplitReadTask.java:120) ~[classes/:?]
at com.ververica.cdc.connectors.mysql.debezium.reader.SnapshotSplitReader.lambda$submitSplit$0(SnapshotSplitReader.java:130) ~[classes/:?]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [?:1.8.0_292]
at java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:266) [?:1.8.0_292]
at java.util.concurrent.FutureTask.run(FutureTask.java) [?:1.8.0_292]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_292]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_292]
at java.lang.Thread.run(Thread.java:748) [?:1.8.0_292]
Caused by: org.apache.kafka.connect.errors.ConnectException: Snapshotting of table customer_1wfwad8.quote_pk_table failed
at com.ververica.cdc.connectors.mysql.debezium.task.MySqlSnapshotSplitReadTask.createDataEventsForTable(MySqlSnapshotSplitReadTask.java:265) ~[classes/:?]
at com.ververica.cdc.connectors.mysql.debezium.task.MySqlSnapshotSplitReadTask.createDataEvents(MySqlSnapshotSplitReadTask.java:193) ~[classes/:?]
at com.ververica.cdc.connectors.mysql.debezium.task.MySqlSnapshotSplitReadTask.doExecute(MySqlSnapshotSplitReadTask.java:154) ~[classes/:?]
at com.ververica.cdc.connectors.mysql.debezium.task.MySqlSnapshotSplitReadTask.execute(MySqlSnapshotSplitReadTask.java:115) ~[classes/:?]
... 7 more
Caused by: java.sql.SQLSyntaxErrorException: You have an error in your SQL syntax; check the manual that corresponds to your MySQL server version for the right syntax to use near 'key >= '10006' AND NOT (key = '10009') AND key <= '10009'' at line 1
at com.mysql.cj.jdbc.exceptions.SQLError.createSQLException(SQLError.java:120) ~[mysql-connector-java-8.0.27.jar:8.0.27]
at com.mysql.cj.jdbc.exceptions.SQLExceptionsMapping.translateException(SQLExceptionsMapping.java:122) ~[mysql-connector-java-8.0.27.jar:8.0.27]
at com.mysql.cj.jdbc.ClientPreparedStatement.executeInternal(ClientPreparedStatement.java:953) ~[mysql-connector-java-8.0.27.jar:8.0.27]
at com.mysql.cj.jdbc.ClientPreparedStatement.executeQuery(ClientPreparedStatement.java:1009) ~[mysql-connector-java-8.0.27.jar:8.0.27]
at com.ververica.cdc.connectors.mysql.debezium.task.MySqlSnapshotSplitReadTask.createDataEventsForTable(MySqlSnapshotSplitReadTask.java:230) ~[classes/:?]
at com.ververica.cdc.connectors.mysql.debezium.task.MySqlSnapshotSplitReadTask.createDataEvents(MySqlSnapshotSplitReadTask.java:193) ~[classes/:?]
at com.ververica.cdc.connectors.mysql.debezium.task.MySqlSnapshotSplitReadTask.doExecute(MySqlSnapshotSplitReadTask.java:154) ~[classes/:?]
at com.ververica.cdc.connectors.mysql.debezium.task.MySqlSnapshotSplitReadTask.execute(MySqlSnapshotSplitReadTask.java:115) ~[classes/:?]
... 7 more
@leonardBang PTAL
@huhuan1898 Thanks. Would you rebase the master branch ?
@huhuan1898 Thanks. Would you rebase the master branch ?
@ruanhang1993 I have rebased from master branch, the failed test case of tidb looks confusing.
Thanks @huhuan1898 for the great work! Could you please rebase it to latest master branch since there’s been lots of changes in Flink CDC repo since your original commit? Kindly reminder that com.ververica.cdc.connectors package has been moved to org.apache.flink.cdc.connectors.
Also, noticed that PostgreSQL and SQL Server connectors have similar expressions. Should they be quoted, too?
@yuxiqian I've resolved all the comments, the test case failures confused me a long time, considering that this commit is actually a minor change, it seems that those failures may not caused by my changes. I'm considering to split it into several commits, could you give me some sugguestions?
Hi @huhuan1898, apologies for unstable CI in recent days. +1 for creating individual commits to fix each connector to make commit history cleaner.
cc @ruanhang1993
@yuxiqian The test cases of oracle seems not that stable, but the same test cases of other storages looks fine. I'm planning to submit them separately, and try to solve the test case problems of oracle when I have time, the errors looks confusing.