spark icon indicating copy to clipboard operation
spark copied to clipboard

[DO-NOT-REVIEW] [SPARK-49883][SS] State Store Checkpoint Structure V2 Integration with RocksDB and RocksDBFileManager

Open WweiL opened this issue 1 year ago • 1 comments

What changes were proposed in this pull request?

This PR enables RocksDB to read <zip, changelog> file watermarked with unique ids (e.g. version_uniqueId.zip, version_uniqueId.changelog). Below is a explanation on the changes and rationale.

Now for each changelog file, we put a "version: uniqueId" to its first line, from it's current version to the previous snapshot version. For each snapshot (zip) file, there is no change other than their name (version_uniqueId.zip), because snapshot files are single source of truth.

In addition to LastCommitBasedCheckpointId, lastCommittedCheckpointId, loadedCheckpointId added in https://github.com/apache/spark/pull/47895#pullrequestreview-2349097435, also add a in-memory map versionToUniqueIdLineage that maps version to unique Id. This is useful when you reuse the same rocksDB instance in the executor, so you don't need to load the lineage from the changelog file again.

RocksDB:

Load

  • When loadedVersion != version, try to load a changelog file with version_checkpointUniqueId.changelog. If the load fails, there could be multiple cases:

    1. Version corresponds to a zip file:
      1. version_uniqueId.zip, this means (either changelog was not enabled, or it was enabled but version happens to be a checkpoint file), and previously query run ckpt v2
      2. version.zip, this means (either changelog was not enabled, or it was enabled but version happens to be a checkpoint file), and previously query run ckpt v1
    2. Version corresponds to a changelog file:
      1. version_uniqueId.changelog, this means changelog was enabled, and previously query run ckpt v2
      2. version.changelog, this means changelog was enabled, and previously query run ckpt v1
  • For case i.a, we read the lineage file stored in version_uniqueId.changelog.

  • For case ii.a, we construct a new empty lineage (version, sessionCheckpointId).

  • For case i.b and ii.b, there is no need to load the lineage as they were not presented before, we just load the corresponding file without uniqueId, but newer files will be constructed with uniqueId.

Next the code finds the latest snapshot version. When there are multiple snapshot files with the same version but different unique Id (main problem this project was trying to solve), the correct one will be loaded based on the checkpoint id.

Then changelog is replayed with the awareness of lineage. The lineage is stored in memory for next load().

Last, load the changelog writer for version + 1, and write the lineage (version + 1, sessionCheckpointId) to the first line of the file. While it seems that the lineage is written early, it is safe because the change log writer is not committed yet.

  • When loadedVersion == version, the same rocks db instance is reused and the lineage is stored in memory to versionToUniqueIdLineage.

Commit

  • Also save sessionCheckpointId to latestSnapshot
  • Add (newVersion, sessionCheckpointId) to versionToUniqueIdLineage

Abort

Also clear up versionToUniqueIdLineage

RocksDBFileManager:

  • A bunch of add-ups to make until code uniqueId aware.

deleteOldVersions: (need test)

If there are multiple version_uniqueId1.zip(changelog) and versioion.uniqueId2.zip, all are deleted.

Changelog Reader / Writer

We purpose to save the lineage to the first line of the changelog files.

For changelog reader, there is an abstract function readLineage created. In RocksDBCheckpointManager.getChangelogReader function, the readLineage will be called right after the initialization of the changelog reader to update the file pointer to after the lineage. Subsequent getNext function won't be affecter because of this.

For changelog writer, there is an abstract function writeLineage that writes the lineage. This function will be called before any actual changelog data is written in RocksDB.load().

Why are the changes needed?

Improve fault tolerance to RocksDB State Store.

Does this PR introduce any user-facing change?

Not yet, after the V2 format project is finished, customer can use the new config to enable it with better rocksDB state store fault tolerance

How was this patch tested?

Modified existing unit tests. For unit tests and backward compatibility tests please refer to: https://github.com/apache/spark/pull/48356

Was this patch authored or co-authored using generative AI tooling?

No

WweiL avatar Oct 04 '24 20:10 WweiL

This PR depends on https://github.com/apache/spark/pull/47932 and https://github.com/apache/spark/pull/47895

WweiL avatar Oct 04 '24 20:10 WweiL

Thank you for addressing the comments! Merging to master

brkyvz avatar Dec 13 '24 01:12 brkyvz

I'm having trouble merging. Asking @HeartSaVioR for help in merging

brkyvz avatar Dec 13 '24 02:12 brkyvz

I'll take over and keep helping to merge till @brkyvz gets his power again :)

Thanks! Merging to master (on behalf of @brkyvz )

HeartSaVioR avatar Dec 13 '24 02:12 HeartSaVioR