spark icon indicating copy to clipboard operation
spark copied to clipboard

[POC][PYTHON][SS] Arbitrary stateful processing in Structured Streaming with Python

Open HyukjinKwon opened this issue 2 years ago • 1 comments

What changes were proposed in this pull request?

This PR adds the Python version of Dataset.groupByKey(...).flatMapGroupsWithState(...) that is DataFrame.groupby(...).applyInPandasWithState(...) in PySpark.

TBD

Note that documentation will be done in a separate PR given the size of the PR.

Why are the changes needed?

TBD

Does this PR introduce any user-facing change?

Yes, this PR adds a new API DataFrame.groupby(...).applyInPandasWithState(...) in PySpark.

import typing

import pandas as pd

from pyspark.sql.types import StructType, StructField, LongType, StringType
from pyspark.sql.streaming.state import GroupStateTimeout, GroupStateImpl

output_type = StructType([
    StructField("key", LongType()),
    StructField("countAsString", StringType())])
state_type = StructType([StructField("count", LongType())])

# Type hints are optional in `func`.
def func(key: typing.Tuple, pdf: pd.DataFrame, state: GroupStateImpl) -> pd.DataFrame:
    count = state.getOption
    if count is None:
        count = 0
    else:
        count = count[0]
    count += len(pdf)
    state.update((count,))
    return pd.DataFrame({'key': [key[0]], 'countAsString': [str(count)]})


df = spark.readStream.format("rate").option("rowsPerSecond", 10).load().selectExpr("value % 3 as v")
df.groupBy(df["v"]).applyInPandasWithState(
    func, output_type, state_type, "Update", GroupStateTimeout.NoTimeout
).writeStream.format("console").queryName("test").start()

How was this patch tested?

Manually tested, unittests and e2e tests were added in both Python and Scala sides.

HyukjinKwon avatar Jul 26 '22 04:07 HyukjinKwon

cc @viirya and @ueshin FYI who I think are the best ones who can review. I will attach some design docs, etc soon around next week likely.

HyukjinKwon avatar Jul 27 '22 23:07 HyukjinKwon

We can close this now.

HeartSaVioR avatar Sep 26 '22 02:09 HeartSaVioR