[Bug][sqlserver] SqlServer incremental source cannot support exactly-once
Search before asking
- [X] I searched in the issues and found nothing similar.
Flink version
1.18
Flink CDC version
3.0
Reason
overview
At first, we can see what SqlServer incremental source do to guarantee exactly-once sematics in parallel read。 First, split the table into multiple chunks based on the key, each chunk as a snapshot split. These splits can be read in parallel.
In snapshot phase, for each snapshot split between [low_key, high_key]:
- use SqlServerDialect#displayCurrentOffset to get LSN as low_watermark
- read snapshot data between [low_key, high_key] as a temporary state
state1by JDBC connection query. - use SqlServerDialect#displayCurrentOffset to get LSN as high_watermark
- read log between (low_watermark, high_watermark), will update temporary state
state1and generate final state of high_watermark, then emit to downstream.
Then in stream phase, we read log between [high_watermark, +∞)for this split between [low_key, high_key].
problem
However, SqlServerDialect#displayCurrentOffset → SqlServerUtils#currentLsn → SqlServerConnection#getMaxTransactionLsn return the max LSN of system table cdc.lsn_time_mapping , which is not the real latest LSN of whole database system. In this incremental source framework, only the real latest LSN of whole database system can guarantee exactly-once sematics.
As shown in https://stackoverflow.com/questions/29477391/cdc-data-only-shows-up-after-5-minutes, this developer find that CDC Data Only Shows up After 5 Minutes , and the reason is :
Because the capture process extracts change data from the transaction log, there is a built in latency between the time that a change is committed to a source table and the time that the change appears within its associated change table.
For example, low_mark and high_watermark maybe 5 minutes lower than the Lsn of step two(read snapshot data). Then in streaming phase, the log which should be ignored will be read again.
How to verify
Modify test: com.ververica.cdc.connectors.oracle.source.OracleSourceITCase#testEnableBackfillWithDMLPostLowWaterMark, we can see three dml operations are read twicely.
@Test
public void testEnableBackfillWithDMLPostLowWaterMark() throws Exception {
List<String> records = testBackfillWhenWritingEvents(false, 25, USE_POST_LOWWATERMARK_HOOK);
List<String> expectedRecords =
Arrays.asList(
"+I[101, user_1, Shanghai, 123567891234]",
"+I[102, user_2, Shanghai, 123567891234]",
"+I[103, user_3, Shanghai, 123567891234]",
"+I[109, user_4, Shanghai, 123567891234]",
"+I[110, user_5, Shanghai, 123567891234]",
"+I[111, user_6, Shanghai, 123567891234]",
"+I[118, user_7, Shanghai, 123567891234]",
"+I[121, user_8, Shanghai, 123567891234]",
"+I[123, user_9, Shanghai, 123567891234]",
"+I[1009, user_10, Shanghai, 123567891234]",
"+I[1010, user_11, Shanghai, 123567891234]",
"+I[1011, user_12, Shanghai, 123567891234]",
"+I[1012, user_13, Shanghai, 123567891234]",
"+I[1013, user_14, Shanghai, 123567891234]",
"+I[1014, user_15, Shanghai, 123567891234]",
"+I[1015, user_16, Shanghai, 123567891234]",
"+I[1016, user_17, Shanghai, 123567891234]",
"+I[1017, user_18, Shanghai, 123567891234]",
"+I[1018, user_19, Shanghai, 123567891234]",
"+I[2000, user_21, Pittsburgh, 123567891234]",
"+I[15213, user_15213, Shanghai, 123567891234]",
// the operations already be applied in snapshot phase, but are read again in streaming phase
"+I[15213, user_15213, Shanghai, 123567891234]",
"-U[2000, user_21, Shanghai, 123567891234]",
"+U[2000, user_21, Pittsburgh, 123567891234]",
"-D[1019, user_20, Shanghai, 123567891234]");
assertEqualsInAnyOrder(expectedRecords, records);
}
private List<String> testBackfillWhenWritingEvents(
boolean skipSnapshotBackfill, int fetchSize, int hookType) throws Exception {
createAndInitialize("customer.sql");
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(200L);
env.setParallelism(1);
ResolvedSchema customersSchema =
new ResolvedSchema(
Arrays.asList(
physical("ID", BIGINT().notNull()),
physical("NAME", STRING()),
physical("ADDRESS", STRING()),
physical("PHONE_NUMBER", STRING())),
new ArrayList<>(),
UniqueConstraint.primaryKey("pk", Collections.singletonList("ID")));
TestTable customerTable =
new TestTable(ORACLE_DATABASE, ORACLE_SCHEMA, "CUSTOMERS", customersSchema);
String tableId = customerTable.getTableId();
OracleSourceBuilder.OracleIncrementalSource source =
OracleSourceBuilder.OracleIncrementalSource.<RowData>builder()
.hostname(ORACLE_CONTAINER.getHost())
.port(ORACLE_CONTAINER.getOraclePort())
.username(CONNECTOR_USER)
.password(CONNECTOR_PWD)
.databaseList(ORACLE_DATABASE)
.schemaList(ORACLE_SCHEMA)
.tableList("DEBEZIUM.CUSTOMERS")
.skipSnapshotBackfill(skipSnapshotBackfill)
.startupOptions(StartupOptions.initial())
.deserializer(customerTable.getDeserializer())
.build();
// Do some database operations during hook in snapshot period.
SnapshotPhaseHooks hooks = new SnapshotPhaseHooks();
String[] statements =
new String[] {
String.format(
"INSERT INTO %s VALUES (15213, 'user_15213', 'Shanghai', '123567891234')",
tableId),
String.format("UPDATE %s SET address='Pittsburgh' WHERE id=2000", tableId),
String.format("DELETE FROM %s WHERE id=1019", tableId)
};
SnapshotPhaseHook snapshotPhaseHook =
(sourceConfig, split) -> {
// database update operations use TEST_USER rather than CONNECTOR_USER
JdbcConfiguration configuration =
JdbcConfiguration.copy(
((JdbcSourceConfig) sourceConfig)
.getDbzConnectorConfig()
.getJdbcConfig())
.withUser("debezium")
.withPassword("dbz")
.build();
try (OracleConnection oracleConnection =
OracleConnectionUtils.createOracleConnection(configuration)) {
oracleConnection.setAutoCommit(false);
oracleConnection.execute(statements);
oracleConnection.commit();
}
};
if (hookType == USE_POST_LOWWATERMARK_HOOK) {
hooks.setPostLowWatermarkAction(snapshotPhaseHook);
} else if (hookType == USE_PRE_HIGHWATERMARK_HOOK) {
hooks.setPreHighWatermarkAction(snapshotPhaseHook);
}
source.setSnapshotHooks(hooks);
List<String> records = new ArrayList<>();
try (CloseableIterator<RowData> iterator =
env.fromSource(source, WatermarkStrategy.noWatermarks(), "Backfill Skipped Source")
.executeAndCollect()) {
records = fetchRowData(iterator, fetchSize, customerTable::stringify);
env.close();
}
return records;
}
Are you willing to submit a PR?
- [ ] I'm willing to submit a PR!
@gong , @GOODBOY008 , @leonardBang , @ruanhang1993 , @Shawn-Hx , CC, WDYT? By the way, I search a lot and even ask in stackoverflow, but cannot find how to get the real latest LSN of whole database system, would you like to give me some advice?
@gong , @GOODBOY008 , @leonardBang , @ruanhang1993 , @Shawn-Hx , CC, WDYT? By the way, I search a lot and even ask in stackoverflow, but cannot find how to get the real latest LSN of whole database system, would you like to give me some advice?
@loserwang1024 https://debezium.io/documentation/reference/stable/connectors/sqlserver.html#_effect_of_sql_server_capture_job_agent_configuration_on_server_load_and_latency maybe we can set wait time before get lsn?
maybe we can set wait time before get lsn?
@gong , low_watermark smaller is no effect. But for high_watermark, what is the optimal waiting time to ensure that updated data is successfully transferred to CDC table?
@gong, thanks a lot, it seems pollinginterval is the param of sys.sp_cdc_change_job( https://learn.microsoft.com/en-us/sql/relational-databases/system-stored-procedures/sys-sp-cdc-change-job-transact-sql?view=sql-server-ver16) to control the interval to scan cdc data. So we can provide a params for user which depends on the real sqlserver situation, and the default value is 5s(same with sqlserver default value). So user can control the time depending on their actual demand .Then we suspend pollinginterval + 1s to get highwatermark.
CC, @GOODBOY008 , WDYT?
Closing this issue as it has been migrated to Apache Jira.