airbyte icon indicating copy to clipboard operation
airbyte copied to clipboard

source-mssql: enable heartbeats and checkpointing

Open postamar opened this issue 1 year ago • 3 comments

The implementation of heartbeat support for MS SQL Server is very much not mysterious.

This is not ready to merge as there is still at least one bug out there: CdcMssqlSourceTest.testRecordsProducedDuringAndAfterSync fails because the second read contains a duplicate record (namely, the last record of the previous batch) when heartbeats are enabled. I haven't been able to fix this. All I've noticed is that the state that's passed to the second read is different when heartbeats are enabled than when they're not.

Another noticeable difference, probably related to the state difference: with heartbeats enabled (i.e. heartbeat.interval.ms is set to > 0), the first read ends with a heartbeat event.

postamar avatar Dec 14 '23 20:12 postamar

The latest updates on your projects. Learn more about Vercel for Git ↗︎

1 Ignored Deployment
Name Status Preview Comments Updated (UTC)
airbyte-docs ⬜️ Ignored (Inspect) Visit Preview Jan 8, 2024 3:55pm

vercel[bot] avatar Dec 14 '23 20:12 vercel[bot]

  • master
    • #33510 Graphite
      • #33511 Graphite 👈

This stack of pull requests is managed by Graphite. Learn more about stacking.

Join @postamar and the rest of your teammates on Graphite Graphite

postamar avatar Dec 14 '23 20:12 postamar

Before Merging a Connector Pull Request

Wow! What a great pull request you have here! 🎉

To merge this PR, ensure the following has been done/considered for each connector added or updated:

  • [ ] PR name follows PR naming conventions
  • [ ] Breaking changes are considered. If a Breaking Change is being introduced, ensure an Airbyte engineer has created a Breaking Change Plan.
  • [ ] Connector version has been incremented in the Dockerfile and metadata.yaml according to our Semantic Versioning for Connectors guidelines
  • [ ] You've updated the connector's metadata.yaml file any other relevant changes, including a breakingChanges entry for major version bumps. See metadata.yaml docs
  • [ ] Secrets in the connector's spec are annotated with airbyte_secret
  • [ ] All documentation files are up to date. (README.md, bootstrap.md, docs.md, etc...)
  • [ ] Changelog updated in docs/integrations/<source or destination>/<name>.md with an entry for the new version. See changelog example
  • [ ] Migration guide updated in docs/integrations/<source or destination>/<name>-migrations.md with an entry for the new version, if the version is a breaking change. See migration guide example
  • [ ] If set, you've ensured the icon is present in the platform-internal repo. (Docs)

If the checklist is complete, but the CI check is failing,

  1. Check for hidden checklists in your PR description

  2. Toggle the github label checklist-action-run on/off to re-run the checklist CI.

github-actions[bot] avatar Dec 14 '23 20:12 github-actions[bot]

This PR will have to wait until we implement initial snapshots ourselves for source-mssql.

I'm finally beginning to understand what's going on here. For a reason that's still mysterious to me, when heartbeats are enabled the initial read completes with a debezium state (in the offset file) that does not include fields indicating the completion of the snapshot. This causes the Debezium Engine in the subsequent read to include the record at the starting LSN, instead of skipping to the next WAL offset. This effectively reads that record a second time.

The solution is to skip debezium's snapshotting entirely. We mean to implement it ourselves anyway. After that, let's set snapshot.mode to schema_only in the debezium properties at all times and the problem should never manifest itself.

cc @rodireich @xiaohansong

postamar avatar Jan 08 '24 21:01 postamar

A question about setting snapshot mode to schema_only: I wonder if debezium will ever run a snapshot, given we are going to hack the offset file. The content of the offset file would be something like:

"[\"test\",{\"server\":\"test\",\"database\":\"test\"}]" : "{\"commit_lsn\":\"0000093b:000155f8:0003\",\"snapshot\":true,\"snapshot_completed\":true}"

In my understanding when debezium reads this file it will skip snapshot completely and just doing incremental syncs going forward. Do we really need schema_only snapshot in this case?

xiaohansong avatar Jan 08 '24 23:01 xiaohansong

You're right that at that point we won't need debezium to capture a snapshot anymore. However we still need to capture all the schema changes somehow, at least that was the case for mysql: https://github.com/airbytehq/airbyte/blob/master/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/java/io/airbyte/cdk/integrations/debezium/internals/mysql/MySqlDebeziumStateUtil.java#L234

postamar avatar Jan 09 '24 14:01 postamar

I read the code more carefully - it appears I understood the mysql workflow wrong. The same problem occurs in mssql too - to hack debezium and make it run without incremental sync, we need to provide offset file and schema file.

@akashkulk can keep me honest, but here's my finding: in Mysql, the solution is:

  1. run debezium with snapshot.mode=schema_recovery to capture the schema file; then load the result into state
  2. hack the offset file by checking the latest binlog directly from mysql;
  3. run mysql initial load with our own recoverable PK query
  4. then run debezium with incremental sync mode

Borrowing this idea and apply it to MSSQL - we indeed need snapshot.mode=schema_only property and run a mini schema only "snapshot" before running any syncs.

xiaohansong avatar Jan 10 '24 00:01 xiaohansong