[FLINK-36086] Set isSnapshotCompleted only immediately after snapshot
If the SQL Server source connector is restarted while handling updates from a transaction with multiple updates, upon restart, it will skip the non-processed changes and proceed from the next transaction.
This is an analog of DBZ-1128 but reproducible only in Flink CDC.
This is a regression introduced in #2176.
Lower-level details
The isSnapshotCompleted offset context flag in Debezium tells the source connector to jump one transaction ahead in the beginning of the streaming phase. This is only necessary during the transition from the initial state snapshot to streaming. Without this flag set, the streaming change data source would stream the changes from the transaction that was already included in the snapshot. This is is the issue that #2176 attempted to address.
The following line sets this flag unconditionally for the stream split: https://github.com/apache/flink-cdc/blob/c5396fbf29ae3a48bb13e38d113b8be674861a22/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-sqlserver-cdc/src/main/java/org/apache/flink/cdc/connectors/sqlserver/source/reader/fetch/SqlServerStreamFetchTask.java#L59
It makes Debezium jump one transaction ahead not only after completing the initial state snapshot but always during the start of streaming. This way, it may potentially skip changes from a transaction that wasn't fully captured prior to the restart.
The proposed solution is to set it only during the transition from the initial state snapshot to streaming.
Testing considerations
- In order to implement a test for the scenario in question, I would need a way to restart the connector in the middle of processing a transaction (something like
start(..., Predicate<SourceRecord> isStopRecord)in the Debezium testing framework). How could I implement a similar case in Flink CDC? - The test implemented in https://github.com/apache/flink-cdc/pull/2176 doesn't fail even if the fix for the issue it covers is reverted, so at this point it's not clear how to reproduce the original issue.
@GOODBOY008 Would you like to take a look at this PR when you have time?
@GOODBOY008 do you have an idea why the test introduced in https://github.com/apache/flink-cdc/pull/2176 passes even with the fix reverted? I'd like to make sure that I'm not breaking the original fix. I would also appreciate advise on testing the restart-mid-transaction scenario, if that's possible.
@GOODBOY008 do you have an idea why the test introduced in #2176 passes even with the fix reverted? I'd like to make sure that I'm not breaking the original fix. I would also appreciate advise on testing the restart-mid-transaction scenario, if that's possible.
@morozov , I had try remove sourceFetchContext.getOffsetContext().preSnapshotCompletion(); and unit test also passed. I'm researching on it.
This pull request has been automatically marked as stale because it has not had recent activity for 60 days. It will be closed in 30 days if no further activity occurs.
@leonardBang, @GOODBOY008, what are the next steps to get this merged?
Hey @GOODBOY008 Would you like to review this PR when you have time ?
@leonardBang this PR is getting merge conflicts and may require extra care. Are you interested in getting this applied?
@morozov I will design and add unit tests to verify the effectiveness of the changes.
Fixed in https://github.com/apache/flink-cdc/pull/3873.