flink-cdc icon indicating copy to clipboard operation
flink-cdc copied to clipboard

[FLINK-38139] Fix consecutive online schema change causes job failure.

Open lvyanquan opened this issue 5 months ago • 2 comments

Fix potential exception when online schema change happened.

Why did this happen? Because when making table schema changes, the original table was not locked. And the online schema change corresponds to a set of SQL statements, which were not processed completely before being issued. There may be a situation where the table structure does not correspond.

lvyanquan avatar Jul 24 '25 02:07 lvyanquan

What if a gh-ost process is interrupted after executing alter /* gh-ost */ table db._tb1_gho add column c varchar(255), and after a while another gh-ost job is initiated on the same table? Would adding a unit test to cover this scenario be useful?

May be we could add a unit test like this: The first gh-ost change is canceled, downstream not receive the schema change event. The second gh-ost change succeeds, and downstream receives the schema change event of the second change.

beryllw avatar Nov 13 '25 08:11 beryllw

Not supporting tables with a primary key id AUTO_INCREMENT: gh-ost generates two schema change DDLs as shown below. We need to emit the first one. Additionally, we might consider adding test cases for tables with a primary key id AUTO_INCREMENT.

16598 [Source Data Fetcher for Source: MySQL Source -> SchemaOperator -> PrePartition (1/1)#0] INFO  org.apache.flink.cdc.connectors.mysql.debezium.reader.BinlogSplitReader - Received the start event of online schema change: SourceRecord{sourcePartition={server=mysql_binlog_source}, sourceOffset={transaction_id=null, ts_sec=1763457658, file=mysql-bin.000003, pos=393080, gtids=df0b83c5-c45f-11f0-ac64-eab2010302a6:1-1088, server_id=223344}} ConnectRecord{topic='mysql_binlog_source', kafkaPartition=0, key=Struct{databaseName=customer_rsjl2b}, keySchema=Schema{io.debezium.connector.mysql.SchemaChangeKey:STRUCT}, value=Struct{source=Struct{version=1.9.8.Final,connector=mysql,name=mysql_binlog_source,ts_ms=1763457658483,db=customer_rsjl2b,table=customers,server_id=223344,gtid=df0b83c5-c45f-11f0-ac64-eab2010302a6:1089,file=mysql-bin.000003,pos=392886,row=0},historyRecord={
  "source" : {
    "file" : "mysql-bin.000003",
    "pos" : 392886,
    "server_id" : 223344
  },
  "position" : {
    "transaction_id" : null,
    "ts_sec" : 1763457658,
    "file" : "mysql-bin.000003",
    "pos" : 393080,
    "gtids" : "df0b83c5-c45f-11f0-ac64-eab2010302a6:1-1088",
    "server_id" : 223344
  },
  "databaseName" : "customer_rsjl2b",
  "ddl" : "alter /* gh-ost */ table `customer_rsjl2b`.`customers` add column ext int first",
  "tableChanges" : [ ]
}}, valueSchema=Schema{io.debezium.connector.mysql.SchemaChangeValue:STRUCT}, timestamp=null, headers=ConnectHeaders(headers=)}. Save it for later.
16599 [Source Data Fetcher for Source: MySQL Source -> SchemaOperator -> PrePartition (1/1)#0] INFO  org.apache.flink.cdc.connectors.mysql.debezium.reader.BinlogSplitReader - Received the start event of online schema change: SourceRecord{sourcePartition={server=mysql_binlog_source}, sourceOffset={transaction_id=null, ts_sec=1763457658, file=mysql-bin.000003, pos=394428, gtids=df0b83c5-c45f-11f0-ac64-eab2010302a6:1-1092, server_id=223344}} ConnectRecord{topic='mysql_binlog_source', kafkaPartition=0, key=Struct{databaseName=customer_rsjl2b}, keySchema=Schema{io.debezium.connector.mysql.SchemaChangeKey:STRUCT}, value=Struct{source=Struct{version=1.9.8.Final,connector=mysql,name=mysql_binlog_source,ts_ms=1763457658486,db=customer_rsjl2b,table=customers,server_id=223344,gtid=df0b83c5-c45f-11f0-ac64-eab2010302a6:1093,file=mysql-bin.000003,pos=394239,row=0},historyRecord={
  "source" : {
    "file" : "mysql-bin.000003",
    "pos" : 394239,
    "server_id" : 223344
  },
  "position" : {
    "transaction_id" : null,
    "ts_sec" : 1763457658,
    "file" : "mysql-bin.000003",
    "pos" : 394428,
    "gtids" : "df0b83c5-c45f-11f0-ac64-eab2010302a6:1-1092",
    "server_id" : 223344
  },
  "databaseName" : "customer_rsjl2b",
  "ddl" : "alter /* gh-ost */ table `customer_rsjl2b`.`customers` AUTO_INCREMENT=5699",
  "tableChanges" : [ ]
}}, valueSchema=Schema{io.debezium.connector.mysql.SchemaChangeValue:STRUCT}, timestamp=null, headers=ConnectHeaders(headers=)}. Save it for later.

Minor improvement, use List to cache multiple DDL events for online schema changes https://github.com/beryllw/flink-cdc/commit/c621f7b6172d919679813ae81f2f453387b2a3f0

beryllw avatar Nov 18 '25 09:11 beryllw