[FLINK-38139] Fix consecutive online schema change causes job failure.
Fix potential exception when online schema change happened.
Why did this happen? Because when making table schema changes, the original table was not locked. And the online schema change corresponds to a set of SQL statements, which were not processed completely before being issued. There may be a situation where the table structure does not correspond.
What if a gh-ost process is interrupted after executing alter /* gh-ost */ table db._tb1_gho add column c varchar(255), and after a while another gh-ost job is initiated on the same table? Would adding a unit test to cover this scenario be useful?
May be we could add a unit test like this: The first gh-ost change is canceled, downstream not receive the schema change event. The second gh-ost change succeeds, and downstream receives the schema change event of the second change.
Not supporting tables with a primary key id AUTO_INCREMENT: gh-ost generates two schema change DDLs as shown below. We need to emit the first one. Additionally, we might consider adding test cases for tables with a primary key id AUTO_INCREMENT.
16598 [Source Data Fetcher for Source: MySQL Source -> SchemaOperator -> PrePartition (1/1)#0] INFO org.apache.flink.cdc.connectors.mysql.debezium.reader.BinlogSplitReader - Received the start event of online schema change: SourceRecord{sourcePartition={server=mysql_binlog_source}, sourceOffset={transaction_id=null, ts_sec=1763457658, file=mysql-bin.000003, pos=393080, gtids=df0b83c5-c45f-11f0-ac64-eab2010302a6:1-1088, server_id=223344}} ConnectRecord{topic='mysql_binlog_source', kafkaPartition=0, key=Struct{databaseName=customer_rsjl2b}, keySchema=Schema{io.debezium.connector.mysql.SchemaChangeKey:STRUCT}, value=Struct{source=Struct{version=1.9.8.Final,connector=mysql,name=mysql_binlog_source,ts_ms=1763457658483,db=customer_rsjl2b,table=customers,server_id=223344,gtid=df0b83c5-c45f-11f0-ac64-eab2010302a6:1089,file=mysql-bin.000003,pos=392886,row=0},historyRecord={
"source" : {
"file" : "mysql-bin.000003",
"pos" : 392886,
"server_id" : 223344
},
"position" : {
"transaction_id" : null,
"ts_sec" : 1763457658,
"file" : "mysql-bin.000003",
"pos" : 393080,
"gtids" : "df0b83c5-c45f-11f0-ac64-eab2010302a6:1-1088",
"server_id" : 223344
},
"databaseName" : "customer_rsjl2b",
"ddl" : "alter /* gh-ost */ table `customer_rsjl2b`.`customers` add column ext int first",
"tableChanges" : [ ]
}}, valueSchema=Schema{io.debezium.connector.mysql.SchemaChangeValue:STRUCT}, timestamp=null, headers=ConnectHeaders(headers=)}. Save it for later.
16599 [Source Data Fetcher for Source: MySQL Source -> SchemaOperator -> PrePartition (1/1)#0] INFO org.apache.flink.cdc.connectors.mysql.debezium.reader.BinlogSplitReader - Received the start event of online schema change: SourceRecord{sourcePartition={server=mysql_binlog_source}, sourceOffset={transaction_id=null, ts_sec=1763457658, file=mysql-bin.000003, pos=394428, gtids=df0b83c5-c45f-11f0-ac64-eab2010302a6:1-1092, server_id=223344}} ConnectRecord{topic='mysql_binlog_source', kafkaPartition=0, key=Struct{databaseName=customer_rsjl2b}, keySchema=Schema{io.debezium.connector.mysql.SchemaChangeKey:STRUCT}, value=Struct{source=Struct{version=1.9.8.Final,connector=mysql,name=mysql_binlog_source,ts_ms=1763457658486,db=customer_rsjl2b,table=customers,server_id=223344,gtid=df0b83c5-c45f-11f0-ac64-eab2010302a6:1093,file=mysql-bin.000003,pos=394239,row=0},historyRecord={
"source" : {
"file" : "mysql-bin.000003",
"pos" : 394239,
"server_id" : 223344
},
"position" : {
"transaction_id" : null,
"ts_sec" : 1763457658,
"file" : "mysql-bin.000003",
"pos" : 394428,
"gtids" : "df0b83c5-c45f-11f0-ac64-eab2010302a6:1-1092",
"server_id" : 223344
},
"databaseName" : "customer_rsjl2b",
"ddl" : "alter /* gh-ost */ table `customer_rsjl2b`.`customers` AUTO_INCREMENT=5699",
"tableChanges" : [ ]
}}, valueSchema=Schema{io.debezium.connector.mysql.SchemaChangeValue:STRUCT}, timestamp=null, headers=ConnectHeaders(headers=)}. Save it for later.
Minor improvement, use List to cache multiple DDL events for online schema changes https://github.com/beryllw/flink-cdc/commit/c621f7b6172d919679813ae81f2f453387b2a3f0