flink-cdc icon indicating copy to clipboard operation
flink-cdc copied to clipboard

[mysql][oracle] Quote primary key when split snapshot

Open huhuan1898 opened this issue 3 years ago • 8 comments

MySqlSnapshotSplitReadTask throw exception when primary key conflict with keywords or reserved words.

sample table:

CREATE TABLE quote_pk_table (
  `key` VARCHAR(10) NOT NULL PRIMARY KEY,
  name VARCHAR(255) NOT NULL DEFAULT 'flink',
  address VARCHAR(1024),
  phone_number INTEGER DEFAULT ' 123 '
);

exception:

682590 [debezium-reader-2] ERROR com.ververica.cdc.connectors.mysql.debezium.reader.SnapshotSplitReader - Execute snapshot read task for mysql split MySqlSnapshotSplit{tableId=customer_1wfwad8.quote_pk_table, splitId='customer_1wfwad8.quote_pk_table:2', splitKeyType=[`key` VARCHAR(10) NOT NULL], splitStart=[10006], splitEnd=[10009], highWatermark=null} fail
io.debezium.DebeziumException: org.apache.kafka.connect.errors.ConnectException: Snapshotting of table customer_1wfwad8.quote_pk_table failed
	at com.ververica.cdc.connectors.mysql.debezium.task.MySqlSnapshotSplitReadTask.execute(MySqlSnapshotSplitReadTask.java:120) ~[classes/:?]
	at com.ververica.cdc.connectors.mysql.debezium.reader.SnapshotSplitReader.lambda$submitSplit$0(SnapshotSplitReader.java:130) ~[classes/:?]
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [?:1.8.0_292]
	at java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:266) [?:1.8.0_292]
	at java.util.concurrent.FutureTask.run(FutureTask.java) [?:1.8.0_292]
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_292]
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_292]
	at java.lang.Thread.run(Thread.java:748) [?:1.8.0_292]
Caused by: org.apache.kafka.connect.errors.ConnectException: Snapshotting of table customer_1wfwad8.quote_pk_table failed
	at com.ververica.cdc.connectors.mysql.debezium.task.MySqlSnapshotSplitReadTask.createDataEventsForTable(MySqlSnapshotSplitReadTask.java:265) ~[classes/:?]
	at com.ververica.cdc.connectors.mysql.debezium.task.MySqlSnapshotSplitReadTask.createDataEvents(MySqlSnapshotSplitReadTask.java:193) ~[classes/:?]
	at com.ververica.cdc.connectors.mysql.debezium.task.MySqlSnapshotSplitReadTask.doExecute(MySqlSnapshotSplitReadTask.java:154) ~[classes/:?]
	at com.ververica.cdc.connectors.mysql.debezium.task.MySqlSnapshotSplitReadTask.execute(MySqlSnapshotSplitReadTask.java:115) ~[classes/:?]
	... 7 more
Caused by: java.sql.SQLSyntaxErrorException: You have an error in your SQL syntax; check the manual that corresponds to your MySQL server version for the right syntax to use near 'key >= '10006' AND NOT (key = '10009') AND key <= '10009'' at line 1
	at com.mysql.cj.jdbc.exceptions.SQLError.createSQLException(SQLError.java:120) ~[mysql-connector-java-8.0.27.jar:8.0.27]
	at com.mysql.cj.jdbc.exceptions.SQLExceptionsMapping.translateException(SQLExceptionsMapping.java:122) ~[mysql-connector-java-8.0.27.jar:8.0.27]
	at com.mysql.cj.jdbc.ClientPreparedStatement.executeInternal(ClientPreparedStatement.java:953) ~[mysql-connector-java-8.0.27.jar:8.0.27]
	at com.mysql.cj.jdbc.ClientPreparedStatement.executeQuery(ClientPreparedStatement.java:1009) ~[mysql-connector-java-8.0.27.jar:8.0.27]
	at com.ververica.cdc.connectors.mysql.debezium.task.MySqlSnapshotSplitReadTask.createDataEventsForTable(MySqlSnapshotSplitReadTask.java:230) ~[classes/:?]
	at com.ververica.cdc.connectors.mysql.debezium.task.MySqlSnapshotSplitReadTask.createDataEvents(MySqlSnapshotSplitReadTask.java:193) ~[classes/:?]
	at com.ververica.cdc.connectors.mysql.debezium.task.MySqlSnapshotSplitReadTask.doExecute(MySqlSnapshotSplitReadTask.java:154) ~[classes/:?]
	at com.ververica.cdc.connectors.mysql.debezium.task.MySqlSnapshotSplitReadTask.execute(MySqlSnapshotSplitReadTask.java:115) ~[classes/:?]
	... 7 more

huhuan1898 avatar Nov 07 '22 09:11 huhuan1898

@leonardBang PTAL

huhuan1898 avatar Nov 07 '22 10:11 huhuan1898

@huhuan1898 Thanks. Would you rebase the master branch ?

ruanhang1993 avatar Jul 05 '23 09:07 ruanhang1993

@huhuan1898 Thanks. Would you rebase the master branch ?

@ruanhang1993 I have rebased from master branch, the failed test case of tidb looks confusing.

huhuan1898 avatar Aug 30 '23 06:08 huhuan1898

Thanks @huhuan1898 for the great work! Could you please rebase it to latest master branch since there’s been lots of changes in Flink CDC repo since your original commit? Kindly reminder that com.ververica.cdc.connectors package has been moved to org.apache.flink.cdc.connectors.

Also, noticed that PostgreSQL and SQL Server connectors have similar expressions. Should they be quoted, too?

yuxiqian avatar Apr 25 '24 08:04 yuxiqian

@yuxiqian I've resolved all the comments, the test case failures confused me a long time, considering that this commit is actually a minor change, it seems that those failures may not caused by my changes. I'm considering to split it into several commits, could you give me some sugguestions?

huhuan1898 avatar Aug 06 '24 16:08 huhuan1898

Hi @huhuan1898, apologies for unstable CI in recent days. +1 for creating individual commits to fix each connector to make commit history cleaner.

cc @ruanhang1993

yuxiqian avatar Aug 07 '24 01:08 yuxiqian

@yuxiqian The test cases of oracle seems not that stable, but the same test cases of other storages looks fine. I'm planning to submit them separately, and try to solve the test case problems of oracle when I have time, the errors looks confusing.

huhuan1898 avatar Aug 07 '24 11:08 huhuan1898