spark icon indicating copy to clipboard operation
spark copied to clipboard

[SPARK-40434][SS][PYTHON] Implement applyInPandasWithState in PySpark

Open HeartSaVioR opened this issue 2 years ago • 4 comments

What changes were proposed in this pull request?

This PR proposes to introduce the new API applyInPandasWithState in PySpark, which provides the functionality to perform arbitrary stateful processing in Structured Streaming.

This will be a pair API with applyInPandas - applyInPandas in PySpark covers the use case of flatMapGroups in Scala/Java API, applyInPandasWithState in PySpark covers the use case of flatMapGroupsWithState in Scala/Java API.

The signature of API follows:

# call this function after groupBy
def applyInPandasWithState(
    self,
    func: "PandasGroupedMapFunctionWithState",
    outputStructType: Union[StructType, str],
    stateStructType: Union[StructType, str],
    outputMode: str,
    timeoutConf: str,
) -> DataFrame

and the signature of user function follows:

def func(
    key: Tuple,
    pdf_iter: Iterator[pandas.DataFrame],
    state: GroupStateImpl
) -> Iterator[pandas.DataFrame]

(Please refer the code diff for function doc of new function.)

Major design choices which differ from existing APIs:

  1. The new API is untyped, while flatMapGroupsWithState in typed API.

This is based on the nature of Python language - it's really duck typing and type definition is just a hint. We don't have the implementation of typed API for PySpark DataFrame.

This leads us to design the API to be untyped, meaning, all types for (input, state, output) should be Row-compatible. While we don't require end users to deal with Row directly, the model they will use for state and output must be convertible to Row with default encoder. If they want the python type for state which is not compatible with Row (e.g. custom class), they need to pickle and use BinaryType to store it.

This requires end users to specify the type of state and output via Spark SQL schema in the method.

Note that this helps to ensure compatibility for state data across Spark versions, as long as the encoders for 1) python type -> python Row and 2) python Row -> UnsafeRow are not changed. We won't change the underlying data layout for UnsafeRow, as it will break all of existing stateful query.

  1. The new API will produce Pandas DataFrame to user function, while flatMapGroupsWithState produces iterator of rows.

We decided to follow the user experience applyInPandas provides for both consistency and performance (Arrow batching, vectorization, etc). This leads us to design the user function to leverage pandas DataFrame rather than iterator of rows. While this leads inconsistency of the UX from the Scala/Java API, we don't think this will come up as a problem since Pandas is considered as de-facto standard for Python data scientists.

  1. The new API will produce iterator of Pandas DataFrame to user function and also require to return iterator of Pandas DataFrame to address scalability.

There is known limitation of applyInPandas, scalability. It basically requires data in a specific group to be fit into memory. During the design phase of new API, we decided to address the scalability rather than inheriting the limitation.

To address the scalability, we tweak the user function to receive an iterator (generator) of Pandas DataFrame instead of a single Pandas DataFrame, and also return an iterator (generator) of Pandas DataFrame. We think it does not hurt the UX too much, as for-each and yield would be enough to deal with the requirement of dealing with iterator.

Implementation perspective, we split the data in a specific group to multiple chunks, which each chunk is stored and sent as "an" Arrow RecordBatch, and then finally materialized to "a" pandas DataFrame. This way, as long as end users don't materialize lots of pandas DataFrames from the iterator at the same time, only one chunk will be materialized into memory which is scalable. Similar logic applies to the output of user function, hence scalable as well.

  1. The new API also bin-packs the data with multiple groups into "an" Arrow RecordBatch.

Given the API is mainly used for streaming workload, it could be high likely that the volume of data in a specific group may not be huge enough to leverage the benefit of Arrow columnar batching, which would hurt the performance. To address this, we also do the opposite thing what we do for scalability, bin-pack. That said, an Arrow RecordBatch can contain data for multiple groups, as well as a part of data for specific group. This address both aspects of concerns together, scalability and performance.

Note that we are not implementing all of features Scala/Java API provide from the initial phase. e.g. Support for batch query and support for initial state will be left as TODO.

Why are the changes needed?

PySpark users don't have a way to perform arbitrary stateful processing in Structured Streaming and being forced to use either Java or Scala which is unacceptable for users in many cases. This PR enables PySpark users to deal with it without moving to Java/Scala world.

Does this PR introduce any user-facing change?

Yes. We are exposing new public API in PySpark which performs arbitrary stateful processing.

How was this patch tested?

N/A. We will make sure test suites are constructed via E2E manner under SPARK-40431 - #37894

HeartSaVioR avatar Sep 15 '22 04:09 HeartSaVioR

cc. @viirya @HyukjinKwon Please take a look into this. Thanks. I understand this is huge and a bit complicated in some part, logic around binpack/chunk. Please feel free to leave comments if the code comment isn't sufficient to understand, I'll try my best to cover it.

And please help finding more eyes of reviewers. I'm not sure who would be available to review the PR touching both PySpark and Structured Streaming, but probably knowing one area and reviewing one area is still also helpful.

HeartSaVioR avatar Sep 16 '22 01:09 HeartSaVioR

Will take a close look next Monday in KST.

HyukjinKwon avatar Sep 16 '22 11:09 HyukjinKwon

I tried to add method level doc as many as possible, except the case I think it's unnecessary (I might still miss some pieces).

I don't go with the approach trying to explain all of the parameters with types though, for reasons:

  • For Python code, it'd be really hard to maintain the doc in sync with the code.
  • For Scala/Java we have been omitting the doc of parameters or even entire class/method if it's obvious from the name.

In both languages, we strongly encourage to have method doc and parameter explanation for public APIs. Here we technically add only one public method applyInPandasWithState in group_ops, and a couple of public classes GroupState and GroupStateTimeout in PySpark. Others are all internal and private.

HeartSaVioR avatar Sep 20 '22 12:09 HeartSaVioR

@HyukjinKwon @alex-balikov Please go with another round of review, thanks in advance!

HeartSaVioR avatar Sep 21 '22 07:09 HeartSaVioR

https://github.com/HeartSaVioR/spark/actions/runs/3098349789/jobs/5019498380

BasicSchedulerIntegrationSuite.super simple job
org.scalatest.exceptions.TestFailedException: Map() did not equal Map(0 -> 42, 5 -> 42, 1 -> 42, 6 -> 42, 9 -> 42, 2 -> 42, 7 -> 42, 3 -> 42, 8 -> 42, 4 -> 42)
 [Check failure on line 210 in YarnClusterSuite](https://github.com/HeartSaVioR/spark/commit/f1000487960fa19aff9979211db68e63ec4384e0#annotation_4680752580) 

YarnClusterSuite.run Spark in yarn-client mode with different configurations, ensuring redaction
org.scalatest.exceptions.TestFailedDueToTimeoutException: The code passed to eventually never returned normally. Attempted 190 times over 3.001220665 minutes. Last failure message: handle.getState().isFinal() was false.
 [Check failure on line 210 in YarnClusterSuite](https://github.com/HeartSaVioR/spark/commit/f1000487960fa19aff9979211db68e63ec4384e0#annotation_4680752582) 

YarnClusterSuite.run Spark in yarn-cluster mode with different configurations, ensuring redaction
org.scalatest.exceptions.TestFailedDueToTimeoutException: The code passed to eventually never returned normally. Attempted 190 times over 3.001046460266666 minutes. Last failure message: handle.getState().isFinal() was false.
 [Check failure on line 210 in YarnClusterSuite](https://github.com/HeartSaVioR/spark/commit/f1000487960fa19aff9979211db68e63ec4384e0#annotation_4680752584) 

YarnClusterSuite.yarn-cluster should respect conf overrides in SparkHadoopUtil (SPARK-16414, SPARK-23630)
org.scalatest.exceptions.TestFailedDueToTimeoutException: The code passed to eventually never returned normally. Attempted 190 times over 3.0008878969166664 minutes. Last failure message: handle.getState().isFinal() was false.
 [Check failure on line 210 in YarnClusterSuite](https://github.com/HeartSaVioR/spark/commit/f1000487960fa19aff9979211db68e63ec4384e0#annotation_4680752586) 


YarnClusterSuite.SPARK-35672: run Spark in yarn-client mode with additional jar using URI scheme 'local'
org.scalatest.exceptions.TestFailedDueToTimeoutException: The code passed to eventually never returned normally. Attempted 190 times over 3.0009406496 minutes. Last failure message: handle.getState().isFinal() was false.

None of test failures is related to the change of this PR. Since we updated the PR again via dd7a655, let's see the build.

HeartSaVioR avatar Sep 21 '22 20:09 HeartSaVioR

My comments are just nits. I will merge this in first to move forward.

Merged to master.

HyukjinKwon avatar Sep 22 '22 03:09 HyukjinKwon

Thanks @HyukjinKwon and @alex-balikov for thoughtful reviewing and merging! I'll handle the latest comments as a follow-up PR.

HeartSaVioR avatar Sep 22 '22 03:09 HeartSaVioR