flink-cdc
flink-cdc copied to clipboard
[Bug][mysql] Remove all previous table and add new added table will throw Exception.
Search before asking
- [X] I searched in the issues and found nothing similar.
Flink version
1.18
Flink CDC version
3.0.1
Database and its version
anyone
Minimal reproduce step
- Stop job in savepoint.
- Set 'scan.incremental.snapshot.enabled' = 'true' and then set tableList with tables which not includes in last time.
- Then assign status will be chaos. Take a test case for example:
public class NewlyAddedTableITCase extends MySqlSourceTestBase {
@Test
public void testRemoveAndAddTablesOneByOne() throws Exception {
testRemoveAndAddTablesOneByOne(
1, "address_hangzhou", "address_beijing", "address_shanghai");
}
private void testRemoveAndAddTablesOneByOne(int parallelism, String... captureAddressTables)
throws Exception {
MySqlConnection connection = getConnection();
// step 1: create mysql tables with all tables included
initialAddressTables(connection, captureAddressTables);
final TemporaryFolder temporaryFolder = new TemporaryFolder();
temporaryFolder.create();
final String savepointDirectory = temporaryFolder.newFolder().toURI().toString();
// get all expected data
List<String> fetchedDataList = new ArrayList<>();
String finishedSavePointPath = null;
// test removing and adding table one by one
for (int round = 0; round < captureAddressTables.length; round++) {
String captureTableThisRound = captureAddressTables[round];
String cityName = captureTableThisRound.split("_")[1];
StreamExecutionEnvironment env =
getStreamExecutionEnvironment(finishedSavePointPath, parallelism);
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
String createTableStatement =
getCreateTableStatement(new HashMap<>(), captureTableThisRound);
tEnv.executeSql(createTableStatement);
tEnv.executeSql(
"CREATE TABLE sink ("
+ " table_name STRING,"
+ " id BIGINT,"
+ " country STRING,"
+ " city STRING,"
+ " detail_address STRING,"
+ " primary key (table_name,id) not enforced"
+ ") WITH ("
+ " 'connector' = 'values',"
+ " 'sink-insert-only' = 'false'"
+ ")");
TableResult tableResult = tEnv.executeSql("insert into sink select * from address");
JobClient jobClient = tableResult.getJobClient().get();
// this round's snapshot data
fetchedDataList.addAll(
Arrays.asList(
format(
"+I[%s, 416874195632735147, China, %s, %s West Town address 1]",
captureTableThisRound, cityName, cityName),
format(
"+I[%s, 416927583791428523, China, %s, %s West Town address 2]",
captureTableThisRound, cityName, cityName),
format(
"+I[%s, 417022095255614379, China, %s, %s West Town address 3]",
captureTableThisRound, cityName, cityName)));
waitForSinkSize("sink", fetchedDataList.size());
assertEqualsInAnyOrder(fetchedDataList, TestValuesTableFactory.getRawResults("sink"));
// only this round table's data is captured.
// step 3: make binlog data for all tables before this round(also includes this round)
for (int i = 0; i <= round; i++) {
String tableName = captureAddressTables[i];
makeBinlogForAddressTable(connection, tableName, round);
}
// this round's binlog data
fetchedDataList.addAll(
Arrays.asList(
format(
"-U[%s, 416874195632735147, China, %s, %s West Town address 1]",
captureTableThisRound, cityName, cityName),
format(
"+U[%s, 416874195632735147, CHINA_%s, %s, %s West Town address 1]",
captureTableThisRound, round, cityName, cityName),
format(
"+I[%s, %d, China, %s, %s West Town address 4]",
captureTableThisRound,
417022095255614380L + round,
cityName,
cityName)));
// step 4: assert fetched binlog data in this round
waitForSinkSize("sink", fetchedDataList.size());
assertEqualsInAnyOrder(fetchedDataList, TestValuesTableFactory.getRawResults("sink"));
// step 5: trigger savepoint
finishedSavePointPath = triggerSavepointWithRetry(jobClient, savepointDirectory);
jobClient.cancel().get();
}
}
// setting primary key as id rather than <id, city> is more more realistic.
private String getCreateTableStatement(
Map<String, String> otherOptions, String... captureTableNames) {
return format(
"CREATE TABLE address ("
+ " table_name STRING METADATA VIRTUAL,"
+ " id BIGINT NOT NULL,"
+ " country STRING,"
+ " city STRING,"
+ " detail_address STRING,"
+ " primary key (id) not enforced"
+ ") WITH ("
+ " 'connector' = 'mysql-cdc',"
+ " 'scan.incremental.snapshot.enabled' = 'true',"
+ " 'hostname' = '%s',"
+ " 'port' = '%s',"
+ " 'username' = '%s',"
+ " 'password' = '%s',"
+ " 'database-name' = '%s',"
+ " 'table-name' = '%s',"
+ " 'scan.incremental.snapshot.chunk.size' = '2',"
+ " 'server-time-zone' = 'UTC',"
+ " 'server-id' = '%s',"
+ " 'scan.newly-added-table.enabled' = 'true'"
+ " %s"
+ ")",
MYSQL_CONTAINER.getHost(),
MYSQL_CONTAINER.getDatabasePort(),
customDatabase.getUsername(),
customDatabase.getPassword(),
customDatabase.getDatabaseName(),
getTableNameRegex(captureTableNames),
getServerId(),
otherOptions.isEmpty()
? ""
: ","
+ otherOptions.entrySet().stream()
.map(
e ->
String.format(
"'%s'='%s'",
e.getKey(), e.getValue()))
.collect(Collectors.joining(",")));
}
}
What did you expect to see?
return true
What did you see instead?
An exception will occurs:
org.apache.flink.util.FlinkRuntimeException: The assigner is not ready to offer finished split information, this should not be called
at com.ververica.cdc.connectors.mysql.source.assigners.MySqlSnapshotSplitAssigner.getFinishedSplitInfos(MySqlSnapshotSplitAssigner.java:379) ~[classes/:?]
at com.ververica.cdc.connectors.mysql.source.assigners.MySqlHybridSplitAssigner.getFinishedSplitInfos(MySqlHybridSplitAssigner.java:139) ~[classes/:?]
at com.ververica.cdc.connectors.mysql.source.enumerator.MySqlSourceEnumerator.handleLatestFinishedSplitNumberRequest(MySqlSourceEnumerator.java:324) ~[classes/:?]
at com.ververica.cdc.connectors.mysql.source.enumerator.MySqlSourceEnumerator.handleSourceEvent(MySqlSourceEnumerator.java:170) ~[classes/:?]
at org.apache.flink.runtime.source.coordinator.SourceCoordinator.handleSourceEvent(SourceCoordinator.java:590) ~[flink-runtime-1.18.0.jar:1.18.0]
at org.apache.flink.runtime.source.coordinator.SourceCoordinator.lambda$handleEventFromOperator$3(SourceCoordinator.java:297) ~[flink-runtime-1.18.0.jar:1.18.0]
at org.apache.flink.runtime.source.coordinator.SourceCoordinator.lambda$runInEventLoop$10(SourceCoordinator.java:469) ~[flink-runtime-1.18.0.jar:1.18.0]
at org.apache.flink.util.ThrowableCatchingRunnable.run(ThrowableCatchingRunnable.java:40) [flink-core-1.18.0.jar:1.18.0]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [?:1.8.0_362]
at java.util.concurrent.FutureTask.run(FutureTask.java:266) [?:1.8.0_362]
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) [?:1.8.0_362]
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) [?:1.8.0_362]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_362]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_362]
at java.lang.Thread.run(Thread.java:750) [?:1.8.0_362]
Reason
When restarted with newly added table, MySqlBinlogSplit#filterOutdatedSplitInfos will filter previous table' FinishedSnapshotSplitInfo. In this case, list of FinishedSnapshotSplitInfo will be empty.
Then when add binlog split back to split reader, com.ververica.cdc.connectors.mysql.debezium.reader.BinlogSplitReader#configureFilter cBinlogSplitReader#shouldEmit will seen an empty list of FinishedSnapshotSplitInfo as binlog-only job
// specific offset mode
if (finishedSplitInfos.isEmpty()) {
for (TableId tableId : currentBinlogSplit.getTableSchemas().keySet()) {
tableIdBinlogPositionMap.put(tableId, currentBinlogSplit.getStartingOffset());
}
}
// initial mode
else {
for (FinishedSnapshotSplitInfo finishedSplitInfo : finishedSplitInfos) {
TableId tableId = finishedSplitInfo.getTableId();
List<FinishedSnapshotSplitInfo> list =
splitsInfoMap.getOrDefault(tableId, new ArrayList<>());
list.add(finishedSplitInfo);
splitsInfoMap.put(tableId, list);
BinlogOffset highWatermark = finishedSplitInfo.getHighWatermark();
BinlogOffset maxHighWatermark = tableIdBinlogPositionMap.get(tableId);
if (maxHighWatermark == null || highWatermark.isAfter(maxHighWatermark)) {
tableIdBinlogPositionMap.put(tableId, highWatermark);
}
}
}
Are you willing to submit a PR?
- [x] I'm willing to submit a PR!
@leonardBang , @ruanhang1993 , CC
Closing this issue as it has been migrated to Apache Jira.