flink-cdc icon indicating copy to clipboard operation
flink-cdc copied to clipboard

[Bug][mysql] Remove all previous table and add new added table will throw Exception.

Open loserwang1024 opened this issue 1 year ago • 1 comments

Search before asking

  • [X] I searched in the issues and found nothing similar.

Flink version

1.18

Flink CDC version

3.0.1

Database and its version

anyone

Minimal reproduce step

  1. Stop job in savepoint.
  2. Set 'scan.incremental.snapshot.enabled' = 'true' and then set tableList with tables which not includes in last time.
  3. Then assign status will be chaos. Take a test case for example:
public class NewlyAddedTableITCase extends MySqlSourceTestBase {
    @Test
    public void testRemoveAndAddTablesOneByOne() throws Exception {
        testRemoveAndAddTablesOneByOne(
                1, "address_hangzhou", "address_beijing", "address_shanghai");
    }

    private void testRemoveAndAddTablesOneByOne(int parallelism, String... captureAddressTables)
            throws Exception {

        MySqlConnection connection = getConnection();
        // step 1: create mysql tables with all tables included
        initialAddressTables(connection, captureAddressTables);

        final TemporaryFolder temporaryFolder = new TemporaryFolder();
        temporaryFolder.create();
        final String savepointDirectory = temporaryFolder.newFolder().toURI().toString();

        // get all expected data
        List<String> fetchedDataList = new ArrayList<>();

        String finishedSavePointPath = null;
        // test removing and adding table one by one
        for (int round = 0; round < captureAddressTables.length; round++) {
            String captureTableThisRound = captureAddressTables[round];
            String cityName = captureTableThisRound.split("_")[1];
            StreamExecutionEnvironment env =
                    getStreamExecutionEnvironment(finishedSavePointPath, parallelism);
            StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);

            String createTableStatement =
                    getCreateTableStatement(new HashMap<>(), captureTableThisRound);
            tEnv.executeSql(createTableStatement);
            tEnv.executeSql(
                    "CREATE TABLE sink ("
                            + " table_name STRING,"
                            + " id BIGINT,"
                            + " country STRING,"
                            + " city STRING,"
                            + " detail_address STRING,"
                            + " primary key (table_name,id) not enforced"
                            + ") WITH ("
                            + " 'connector' = 'values',"
                            + " 'sink-insert-only' = 'false'"
                            + ")");
            TableResult tableResult = tEnv.executeSql("insert into sink select * from address");
            JobClient jobClient = tableResult.getJobClient().get();

            // this round's snapshot data
            fetchedDataList.addAll(
                    Arrays.asList(
                            format(
                                    "+I[%s, 416874195632735147, China, %s, %s West Town address 1]",
                                    captureTableThisRound, cityName, cityName),
                            format(
                                    "+I[%s, 416927583791428523, China, %s, %s West Town address 2]",
                                    captureTableThisRound, cityName, cityName),
                            format(
                                    "+I[%s, 417022095255614379, China, %s, %s West Town address 3]",
                                    captureTableThisRound, cityName, cityName)));
            waitForSinkSize("sink", fetchedDataList.size());
            assertEqualsInAnyOrder(fetchedDataList, TestValuesTableFactory.getRawResults("sink"));

            // only this round table's data is captured.
            // step 3: make binlog data for all tables before this round(also includes this round)
            for (int i = 0; i <= round; i++) {
                String tableName = captureAddressTables[i];
                makeBinlogForAddressTable(connection, tableName, round);
            }
            // this round's binlog data
            fetchedDataList.addAll(
                    Arrays.asList(
                            format(
                                    "-U[%s, 416874195632735147, China, %s, %s West Town address 1]",
                                    captureTableThisRound, cityName, cityName),
                            format(
                                    "+U[%s, 416874195632735147, CHINA_%s, %s, %s West Town address 1]",
                                    captureTableThisRound, round, cityName, cityName),
                            format(
                                    "+I[%s, %d, China, %s, %s West Town address 4]",
                                    captureTableThisRound,
                                    417022095255614380L + round,
                                    cityName,
                                    cityName)));

            // step 4: assert fetched binlog data in this round
            waitForSinkSize("sink", fetchedDataList.size());

            assertEqualsInAnyOrder(fetchedDataList, TestValuesTableFactory.getRawResults("sink"));
            // step 5: trigger savepoint
            finishedSavePointPath = triggerSavepointWithRetry(jobClient, savepointDirectory);
            jobClient.cancel().get();
        }
    }

  // setting  primary key as id rather than <id, city> is more more realistic.
   private String getCreateTableStatement(
            Map<String, String> otherOptions, String... captureTableNames) {
        return format(
                "CREATE TABLE address ("
                        + " table_name STRING METADATA VIRTUAL,"
                        + " id BIGINT NOT NULL,"
                        + " country STRING,"
                        + " city STRING,"
                        + " detail_address STRING,"
                        + " primary key (id) not enforced"
                        + ") WITH ("
                        + " 'connector' = 'mysql-cdc',"
                        + " 'scan.incremental.snapshot.enabled' = 'true',"
                        + " 'hostname' = '%s',"
                        + " 'port' = '%s',"
                        + " 'username' = '%s',"
                        + " 'password' = '%s',"
                        + " 'database-name' = '%s',"
                        + " 'table-name' = '%s',"
                        + " 'scan.incremental.snapshot.chunk.size' = '2',"
                        + " 'server-time-zone' = 'UTC',"
                        + " 'server-id' = '%s',"
                        + " 'scan.newly-added-table.enabled' = 'true'"
                        + " %s"
                        + ")",
                MYSQL_CONTAINER.getHost(),
                MYSQL_CONTAINER.getDatabasePort(),
                customDatabase.getUsername(),
                customDatabase.getPassword(),
                customDatabase.getDatabaseName(),
                getTableNameRegex(captureTableNames),
                getServerId(),
                otherOptions.isEmpty()
                        ? ""
                        : ","
                        + otherOptions.entrySet().stream()
                        .map(
                                e ->
                                        String.format(
                                                "'%s'='%s'",
                                                e.getKey(), e.getValue()))
                        .collect(Collectors.joining(",")));
    }

}

What did you expect to see?

return true

What did you see instead?

An exception will occurs:

org.apache.flink.util.FlinkRuntimeException: The assigner is not ready to offer finished split information, this should not be called
	at com.ververica.cdc.connectors.mysql.source.assigners.MySqlSnapshotSplitAssigner.getFinishedSplitInfos(MySqlSnapshotSplitAssigner.java:379) ~[classes/:?]
	at com.ververica.cdc.connectors.mysql.source.assigners.MySqlHybridSplitAssigner.getFinishedSplitInfos(MySqlHybridSplitAssigner.java:139) ~[classes/:?]
	at com.ververica.cdc.connectors.mysql.source.enumerator.MySqlSourceEnumerator.handleLatestFinishedSplitNumberRequest(MySqlSourceEnumerator.java:324) ~[classes/:?]
	at com.ververica.cdc.connectors.mysql.source.enumerator.MySqlSourceEnumerator.handleSourceEvent(MySqlSourceEnumerator.java:170) ~[classes/:?]
	at org.apache.flink.runtime.source.coordinator.SourceCoordinator.handleSourceEvent(SourceCoordinator.java:590) ~[flink-runtime-1.18.0.jar:1.18.0]
	at org.apache.flink.runtime.source.coordinator.SourceCoordinator.lambda$handleEventFromOperator$3(SourceCoordinator.java:297) ~[flink-runtime-1.18.0.jar:1.18.0]
	at org.apache.flink.runtime.source.coordinator.SourceCoordinator.lambda$runInEventLoop$10(SourceCoordinator.java:469) ~[flink-runtime-1.18.0.jar:1.18.0]
	at org.apache.flink.util.ThrowableCatchingRunnable.run(ThrowableCatchingRunnable.java:40) [flink-core-1.18.0.jar:1.18.0]
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [?:1.8.0_362]
	at java.util.concurrent.FutureTask.run(FutureTask.java:266) [?:1.8.0_362]
	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) [?:1.8.0_362]
	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) [?:1.8.0_362]
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_362]
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_362]
	at java.lang.Thread.run(Thread.java:750) [?:1.8.0_362]

Reason

When restarted with newly added table, MySqlBinlogSplit#filterOutdatedSplitInfos will filter previous table' FinishedSnapshotSplitInfo. In this case, list of FinishedSnapshotSplitInfo will be empty.

Then when add binlog split back to split reader, com.ververica.cdc.connectors.mysql.debezium.reader.BinlogSplitReader#configureFilter cBinlogSplitReader#shouldEmit will seen an empty list of FinishedSnapshotSplitInfo as binlog-only job

    // specific offset mode
        if (finishedSplitInfos.isEmpty()) {
            for (TableId tableId : currentBinlogSplit.getTableSchemas().keySet()) {
                tableIdBinlogPositionMap.put(tableId, currentBinlogSplit.getStartingOffset());
            }
        }
        // initial mode
        else {
            for (FinishedSnapshotSplitInfo finishedSplitInfo : finishedSplitInfos) {
                TableId tableId = finishedSplitInfo.getTableId();
                List<FinishedSnapshotSplitInfo> list =
                        splitsInfoMap.getOrDefault(tableId, new ArrayList<>());
                list.add(finishedSplitInfo);
                splitsInfoMap.put(tableId, list);

                BinlogOffset highWatermark = finishedSplitInfo.getHighWatermark();
                BinlogOffset maxHighWatermark = tableIdBinlogPositionMap.get(tableId);
                if (maxHighWatermark == null || highWatermark.isAfter(maxHighWatermark)) {
                    tableIdBinlogPositionMap.put(tableId, highWatermark);
                }
            }
        }

Are you willing to submit a PR?

  • [x] I'm willing to submit a PR!

loserwang1024 avatar Jan 31 '24 09:01 loserwang1024

@leonardBang , @ruanhang1993 , CC

loserwang1024 avatar Jan 31 '24 09:01 loserwang1024

Closing this issue as it has been migrated to Apache Jira.

PatrickRen avatar Apr 09 '24 05:04 PatrickRen