[SPARK-49021][SS] Add support for reading transformWithState value state variables with state data source reader
What changes were proposed in this pull request?
Add support for reading transformWithState value state variables with state data source reader
Co-authored with @jingz-db
Why are the changes needed?
Changes are needed to integrate reading state reading with new operator metadata and state schema format for the value state types used in state variables within transformWithState
Does this PR introduce any user-facing change?
Yes
Users can now read valueState variables used in the transformWithState operator using the state data source reader.
spark
.read
.format("statestore")
.option("operatorId", <operatorId>)
.option("stateVarName", <varName>)
.load(<state path>)
How was this patch tested?
Added unit tests
===== POSSIBLE THREAD LEAK IN SUITE o.a.s.sql.streaming.TransformWithStateSuite, threads: ForkJoinPool.commonPool-worker-4 (daemon=true), Idle Worker Monitor for python3 (daemon=true), rpc-boss-3-1 (daemon=true), ForkJoinPool.commonPool-worker-5 (daemon=true), ForkJoinPool.commonPool-worker-3 (daemon=true), ForkJoinPool.commonPool-worker-2 (daemon=true), shuffle-boss-6-1 (daemon=true), ForkJoinPool.commonPool-worker-1 (daemon=true) =====
[info] Run completed in 2 minutes, 28 seconds.
[info] Total number of tests run: 42
[info] Suites: completed 1, aborted 0
[info] Tests: succeeded 42, failed 0, canceled 0, ignored 1, pending 0
[info] All tests passed.
Was this patch authored or co-authored using generative AI tooling?
No
@jingz-db @HeartSaVioR - PTAL, thx !
General comment: JIRA ticket seems to be mentioning list / map state and PR title is mentioning value state. What's correct? Please make a correction.
General comment: JIRA ticket seems to be mentioning list / map state and PR title is mentioning value state. What's correct? Please make a correction.
Apologies - JIRA ticket had wrong description. Updated it
@HeartSaVioR @jingz-db - could you ptal ? Thx
https://github.com/anishshri-db/spark/actions/runs/10609063268/job/29404193245
[info] *** 2 TESTS FAILED ***
[error] Failed: Total 11562, Failed 2, Errors 0, Passed 11560, Ignored 31, Canceled 1
[error] Failed tests:
[error] org.apache.spark.sql.execution.datasources.v2.state.StateDataSourceNegativeTestSuite
[error] (sql / Test / test) sbt.TestsFailedException: Tests unsuccessful
CI failure seems to be related.
CI failure seems to be related.
Yes thanks - fixed the issue. Should be resolved now. Will wait for CI to finish
Thanks! Merging to master.