[DO-NOT-REVIEW] [SPARK-49883][SS] State Store Checkpoint Structure V2 Integration with RocksDB and RocksDBFileManager
What changes were proposed in this pull request?
This PR enables RocksDB to read <zip, changelog> file watermarked with unique ids (e.g. version_uniqueId.zip, version_uniqueId.changelog). Below is a explanation on the changes and rationale.
Now for each changelog file, we put a "version: uniqueId" to its first line, from it's current version to the previous snapshot version. For each snapshot (zip) file, there is no change other than their name (version_uniqueId.zip), because snapshot files are single source of truth.
In addition to LastCommitBasedCheckpointId, lastCommittedCheckpointId, loadedCheckpointId added in https://github.com/apache/spark/pull/47895#pullrequestreview-2349097435, also add a in-memory map versionToUniqueIdLineage that maps version to unique Id. This is useful when you reuse the same rocksDB instance in the executor, so you don't need to load the lineage from the changelog file again.
RocksDB:
Load
-
When
loadedVersion != version, try to load a changelog file withversion_checkpointUniqueId.changelog. If the load fails, there could be multiple cases:- Version corresponds to a zip file:
version_uniqueId.zip, this means (either changelog was not enabled, or it was enabled but version happens to be a checkpoint file), and previously query run ckpt v2version.zip, this means (either changelog was not enabled, or it was enabled but version happens to be a checkpoint file), and previously query run ckpt v1
- Version corresponds to a changelog file:
version_uniqueId.changelog, this means changelog was enabled, and previously query run ckpt v2version.changelog, this means changelog was enabled, and previously query run ckpt v1
- Version corresponds to a zip file:
-
For case i.a, we read the lineage file stored in
version_uniqueId.changelog. -
For case ii.a, we construct a new empty lineage
(version, sessionCheckpointId). -
For case i.b and ii.b, there is no need to load the lineage as they were not presented before, we just load the corresponding file without
uniqueId, but newer files will be constructed with uniqueId.
Next the code finds the latest snapshot version. When there are multiple snapshot files with the same version but different unique Id (main problem this project was trying to solve), the correct one will be loaded based on the checkpoint id.
Then changelog is replayed with the awareness of lineage. The lineage is stored in memory for next load().
Last, load the changelog writer for version + 1, and write the lineage (version + 1, sessionCheckpointId) to the first line of the file. While it seems that the lineage is written early, it is safe because the change log writer is not committed yet.
- When
loadedVersion == version, the same rocks db instance is reused and the lineage is stored in memory toversionToUniqueIdLineage.
Commit
- Also save
sessionCheckpointIdtolatestSnapshot - Add
(newVersion, sessionCheckpointId)toversionToUniqueIdLineage
Abort
Also clear up versionToUniqueIdLineage
RocksDBFileManager:
- A bunch of add-ups to make until code uniqueId aware.
deleteOldVersions: (need test)
If there are multiple version_uniqueId1.zip(changelog) and versioion.uniqueId2.zip, all are deleted.
Changelog Reader / Writer
We purpose to save the lineage to the first line of the changelog files.
For changelog reader, there is an abstract function readLineage created. In RocksDBCheckpointManager.getChangelogReader function, the readLineage will be called right after the initialization of the changelog reader to update the file pointer to after the lineage. Subsequent getNext function won't be affecter because of this.
For changelog writer, there is an abstract function writeLineage that writes the lineage. This function will be called before any actual changelog data is written in RocksDB.load().
Why are the changes needed?
Improve fault tolerance to RocksDB State Store.
Does this PR introduce any user-facing change?
Not yet, after the V2 format project is finished, customer can use the new config to enable it with better rocksDB state store fault tolerance
How was this patch tested?
Modified existing unit tests. For unit tests and backward compatibility tests please refer to: https://github.com/apache/spark/pull/48356
Was this patch authored or co-authored using generative AI tooling?
No
This PR depends on https://github.com/apache/spark/pull/47932 and https://github.com/apache/spark/pull/47895
Thank you for addressing the comments! Merging to master
I'm having trouble merging. Asking @HeartSaVioR for help in merging
I'll take over and keep helping to merge till @brkyvz gets his power again :)
Thanks! Merging to master (on behalf of @brkyvz )