spark icon indicating copy to clipboard operation
spark copied to clipboard

[SPARK-49601][SS][PYTHON] Support Initial State Handling for TransformWithStateInPandas

Open jingz-db opened this issue 5 months ago • 0 comments

What changes were proposed in this pull request?

This PR adds support for users to provide a Dataframe that can be used to instantiate state for the query in the first batch for arbitrary state API v2 in Python. The Scala PR for supporting initial state is here: https://github.com/apache/spark/pull/45467

We propose to create a new PythonRunner that handles initial state specifically for TransformWithStateInPandas. From JVM, we coGroup input rows and initial state rows on the same grouping key. Then we create a new row that contains one row in the input rows iterator and one row in the initial state iterator, and send the new grouped row to Py4j. Inside the python worker, we deserialize the grouped row into input rows and initial state rows separately and input those into handleInitialState and handleInputRows. We will launch a python worker for each partition that has a non-empty input rows in either input rows or initial states. This will guarantee all keys in the initial state will be processed even if they do not appear in the first batch or they don't lie in the same partition with keys in the first batch.

Why are the changes needed?

We need to couple the API as we support initial state handling in Scala.

Does this PR introduce any user-facing change?

Yes. This PR introduces a new API in the StatefulProcessor which allows users to define their own udf for processing initial state:

 def handleInitialState(
        self, key: Any, initialState: "PandasDataFrameLike"
    ) -> None:

The implementation of this function is optional. If not defined, then it will act as no-op.

How was this patch tested?

Unit tests & integration tests.

Was this patch authored or co-authored using generative AI tooling?

No.

jingz-db avatar Sep 05 '24 23:09 jingz-db