[SPARK-51955] Adding release() to ReadStateStore interface and reusing ReadStore for Streaming Aggregations
What changes were proposed in this pull request?
Adding a release() method to the ReadStateStore interface to properly close read stores without aborting them Implementing a getWriteStore() method that allows converting a read-only store to a writable store Creating a StateStoreRDDProvider interface for tracking state stores by partition ID Enhancing StateStoreRDD to find and reuse existing state stores through RDD lineage Improving task completion handling with proper cleanup listeners
Why are the changes needed?
Currently, stateful operations like aggregations follow a pattern where both read and write stores are opened simultaneously: readStore.acquire() writeStore.acquire() writeStore.commit() readStore.abort() This pattern creates inefficiency because:
The abort() call on the read store unnecessarily invalidates the store's state, causing subsequent operations to reload the entire state store from scratch Having two stores open simultaneously increases memory usage and can create contention issues The upcoming lock hardening changes will only allow one state store to be open at a time, making this pattern incompatible
With the new approach, the usage paradigm becomes: readStore = getReadStore() writeStore = getWriteStore(readStore) writeStore.commit() This new paradigm allows us to reuse an existing read store by converting it to a write store using getWriteStore(), and properly clean up resources using release() instead of abort() when operations complete successfully. This avoids the unnecessary reloading of state data and improves performance while being compatible with future lock hardening changes.
Does this PR introduce any user-facing change?
No
How was this patch tested?
Unit tests
Was this patch authored or co-authored using generative AI tooling?
No
@ericm-db - in the PR description, please update it to say what the new usage paradigm will look like as well
@ericm-db Can you update the PR description to be more specific about the inefficiency we are addressing here? Basically that in the current impl, we always abort read store, triggering unnecessary reload of the state store.
@anishshri-db @liviazhu-db Sure yeah sounds good
cc @cloud-fan
cc - @cloud-fan - could you PTAL too ? especially around the RDD interactions ? Thx
Looks good! Could you add a test in StateStoreRDDSuite to check the ThreadLocal logic correctly passes the readstore to the writestore too?
Yup, working on that rn!