spark
spark copied to clipboard
[POC][PYTHON][SS] Arbitrary stateful processing in Structured Streaming with Python
What changes were proposed in this pull request?
This PR adds the Python version of Dataset.groupByKey(...).flatMapGroupsWithState(...)
that is DataFrame.groupby(...).applyInPandasWithState(...)
in PySpark.
TBD
Note that documentation will be done in a separate PR given the size of the PR.
Why are the changes needed?
TBD
Does this PR introduce any user-facing change?
Yes, this PR adds a new API DataFrame.groupby(...).applyInPandasWithState(...)
in PySpark.
import typing
import pandas as pd
from pyspark.sql.types import StructType, StructField, LongType, StringType
from pyspark.sql.streaming.state import GroupStateTimeout, GroupStateImpl
output_type = StructType([
StructField("key", LongType()),
StructField("countAsString", StringType())])
state_type = StructType([StructField("count", LongType())])
# Type hints are optional in `func`.
def func(key: typing.Tuple, pdf: pd.DataFrame, state: GroupStateImpl) -> pd.DataFrame:
count = state.getOption
if count is None:
count = 0
else:
count = count[0]
count += len(pdf)
state.update((count,))
return pd.DataFrame({'key': [key[0]], 'countAsString': [str(count)]})
df = spark.readStream.format("rate").option("rowsPerSecond", 10).load().selectExpr("value % 3 as v")
df.groupBy(df["v"]).applyInPandasWithState(
func, output_type, state_type, "Update", GroupStateTimeout.NoTimeout
).writeStream.format("console").queryName("test").start()
How was this patch tested?
Manually tested, unittests and e2e tests were added in both Python and Scala sides.
cc @viirya and @ueshin FYI who I think are the best ones who can review. I will attach some design docs, etc soon around next week likely.
We can close this now.