[SPARK-48755] State V2 base implementation and ValueState support
What changes were proposed in this pull request?
- Base implementation for Python State V2
- Implemented ValueState
Why are the changes needed?
Support Python State V2 API
Does this PR introduce any user-facing change?
Yes
How was this patch tested?
Did local integration test with below command
import pandas as pd
from pyspark.sql.streaming import StatefulProcessor, StatefulProcessorHandle
from pyspark.sql.types import StructType, StructField, LongType, StringType
from typing import Iterator
spark.conf.set("spark.sql.streaming.stateStore.providerClass","org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreProvider")
spark.conf.set("spark.sql.shuffle.partitions","1")
output_schema = StructType([
StructField("value", LongType(), True)
])
state_schema = StructType([
StructField("value", StringType(), True)
])
class SimpleStatefulProcessor(StatefulProcessor):
def init(self, handle: StatefulProcessorHandle) -> None:
self.value_state = handle.getValueState("testValueState", state_schema)
def handleInputRows(self, key, rows) -> Iterator[pd.DataFrame]:
self.value_state.update("test_value")
exists = self.value_state.exists()
print(f"value state exists: {exists}")
value = self.value_state.get()
print(f"get value: {value}")
print("clearing value state")
self.value_state.clear()
print("value state cleared")
return rows
def close(self) -> None:
pass
q = spark.readStream.format("rate").option("rowsPerSecond", "1").option("numPartitions", "1").load().groupBy("value").transformWithStateInPandas(stateful_processor = SimpleStatefulProcessor(), outputStructType=output_schema, outputMode="Update", timeMode="None").writeStream.format("console").option("checkpointLocation", "/tmp/streaming/temp_ckp").outputMode("update").start()
Verified from the logs that value state methods work as expected for key 11
handling input rows for key: 11
setting implicit key: 11
sending message -- len = 8 b'"\x06\n\x04\n\x0211'
24/06/27 15:21:40 WARN TransformWithStateInPandasStateServer: version = 0
24/06/27 15:21:40 WARN TransformWithStateInPandasStateServer: parsing a message of 8 bytes
24/06/27 15:21:40 WARN TransformWithStateInPandasStateServer: read bytes = Array(34, 6, 10, 4, 10, 2, 49, 49)
24/06/27 15:21:40 WARN TransformWithStateInPandasStateServer: read message = implicitGroupingKeyRequest {
setImplicitKey {
key: "11"
}
}
24/06/27 15:21:40 WARN TransformWithStateInPandasStateServer: setting implicit key to 11 with type class java.lang.Long
24/06/27 15:21:40 WARN TransformWithStateInPandasStateServer: flush output stream
24/06/27 15:21:40 WARN TransformWithStateInPandasStateServer: reading the version
setImplicitKey status= 0
updating value state: testValueState
sending message -- len = 127 b'\x1a}\n{\x1ay\n\x0etestValueState\x12[{"fields":[{"metadata":{},"name":"value","nullable":true,"type":"string"}],"type":"struct"}\x1a\ntest_value'
24/06/27 15:21:40 WARN TransformWithStateInPandasStateServer: version = 0
24/06/27 15:21:40 WARN TransformWithStateInPandasStateServer: parsing a message of 127 bytes
24/06/27 15:21:40 WARN TransformWithStateInPandasStateServer: read bytes = Array(26, 125, 10, 123, 26, 121, 10, 14, 116, 101, 115, 116, 86, 97, 108, 117, 101, 83, 116, 97, 116, 101, 18, 91, 123, 34, 102, 105, 101, 108, 100, 115, 34, 58, 91, 123, 34, 109, 101, 116, 97, 100, 97, 116, 97, 34, 58, 123, 125, 44, 34, 110, 97, 109, 101, 34, 58, 34, 118, 97, 108, 117, 101, 34, 44, 34, 110, 117, 108, 108, 97, 98, 108, 101, 34, 58, 116, 114, 117, 101, 44, 34, 116, 121, 112, 101, 34, 58, 34, 115, 116, 114, 105, 110, 103, 34, 125, 93, 44, 34, 116, 121, 112, 101, 34, 58, 34, 115, 116, 114, 117, 99, 116, 34, 125, 26, 10, 116, 101, 115, 116, 95, 118, 97, 108, 117, 101)
24/06/27 15:21:40 WARN TransformWithStateInPandasStateServer: read message = stateVariableRequest {
valueStateCall {
update {
stateName: "testValueState"
schema: "{\"fields\":[{\"metadata\":{},\"name\":\"value\",\"nullable\":true,\"type\":\"string\"}],\"type\":\"struct\"}"
value: "test_value"
}
}
}
24/06/27 15:21:40 WARN TransformWithStateInPandasStateServer: updating state testValueState with value test_value and type class java.lang.String
24/06/27 15:21:40 WARN StateTypesEncoder: Serializing grouping key: [11]
24/06/27 15:21:40 WARN TransformWithStateInPandasStateServer: flush output stream
24/06/27 15:21:40 WARN TransformWithStateInPandasStateServer: reading the version
valueStateUpdate status= 0
checking value state exists: testValueState
sending message -- len = 22 b'\x1a\x14\n\x12\n\x10\n\x0etestValueState'
24/06/27 15:21:40 WARN TransformWithStateInPandasStateServer: version = 0
24/06/27 15:21:40 WARN TransformWithStateInPandasStateServer: parsing a message of 22 bytes
24/06/27 15:21:40 WARN TransformWithStateInPandasStateServer: read bytes = Array(26, 20, 10, 18, 10, 16, 10, 14, 116, 101, 115, 116, 86, 97, 108, 117, 101, 83, 116, 97, 116, 101)
24/06/27 15:21:40 WARN TransformWithStateInPandasStateServer: read message = stateVariableRequest {
valueStateCall {
exists {
stateName: "testValueState"
}
}
}
24/06/27 15:21:40 WARN TransformWithStateInPandasStateServer: state testValueState exists
24/06/27 15:21:40 WARN TransformWithStateInPandasStateServer: flush output stream
24/06/27 15:21:40 WARN TransformWithStateInPandasStateServer: reading the version
valueStateExists status= 0
value state exists: True
getting value state: testValueState
sending message -- len = 22 b'\x1a\x14\n\x12\x12\x10\n\x0etestValueState'
24/06/27 15:21:40 WARN TransformWithStateInPandasStateServer: version = 0
24/06/27 15:21:40 WARN TransformWithStateInPandasStateServer: parsing a message of 22 bytes
24/06/27 15:21:40 WARN TransformWithStateInPandasStateServer: read bytes = Array(26, 20, 10, 18, 18, 16, 10, 14, 116, 101, 115, 116, 86, 97, 108, 117, 101, 83, 116, 97, 116, 101)
24/06/27 15:21:40 WARN TransformWithStateInPandasStateServer: read message = stateVariableRequest {
valueStateCall {
get {
stateName: "testValueState"
}
}
}
24/06/27 15:21:40 WARN StateTypesEncoder: Serializing grouping key: [11]
24/06/27 15:21:40 WARN TransformWithStateInPandasStateServer: got state value test_value
24/06/27 15:21:40 WARN TransformWithStateInPandasStateServer: writing value bytes of length 10
24/06/27 15:21:40 WARN TransformWithStateInPandasStateServer: writing value bytes: Array(116, 101, 115, 116, 95, 118, 97, 108, 117, 101)
24/06/27 15:21:40 WARN TransformWithStateInPandasStateServer: flush output stream
24/06/27 15:21:40 WARN TransformWithStateInPandasStateServer: reading the version
valueStateGet status= 0
get value: test_value
clearing value state
clearing value state: testValueState
sending message -- len = 22 b'\x1a\x14\n\x12"\x10\n\x0etestValueState'
24/06/27 15:21:40 WARN TransformWithStateInPandasStateServer: version = 0
24/06/27 15:21:40 WARN TransformWithStateInPandasStateServer: parsing a message of 22 bytes
24/06/27 15:21:40 WARN TransformWithStateInPandasStateServer: read bytes = Array(26, 20, 10, 18, 34, 16, 10, 14, 116, 101, 115, 116, 86, 97, 108, 117, 101, 83, 116, 97, 116, 101)
24/06/27 15:21:40 WARN TransformWithStateInPandasStateServer: read message = stateVariableRequest {
valueStateCall {
clear {
stateName: "testValueState"
}
}
}
24/06/27 15:21:40 WARN StateTypesEncoder: Serializing grouping key: [11]
24/06/27 15:21:40 WARN TransformWithStateInPandasStateServer: flush output stream
24/06/27 15:21:40 WARN TransformWithStateInPandasStateServer: reading the version
valueStateClear status= 0
value state cleared
removing implicit key
sending message -- len = 4 b'"\x02\x12\x00'
24/06/27 15:21:40 WARN TransformWithStateInPandasStateServer: version = 0
24/06/27 15:21:40 WARN TransformWithStateInPandasStateServer: parsing a message of 4 bytes
24/06/27 15:21:40 WARN TransformWithStateInPandasStateServer: read bytes = Array(34, 2, 18, 0)
24/06/27 15:21:40 WARN TransformWithStateInPandasStateServer: read message = implicitGroupingKeyRequest {
removeImplicitKey {
}
}
24/06/27 15:21:40 WARN TransformWithStateInPandasStateServer: removing implicit key
24/06/27 15:21:40 WARN TransformWithStateInPandasStateServer: removed implicit key
24/06/27 15:21:40 WARN TransformWithStateInPandasStateServer: flush output stream
removeImplicitKey status= 0
Will add unit test
Was this patch authored or co-authored using generative AI tooling?
No
Mind filing a JIRA?
Mind filing a JIRA?
Yeah, will do, thanks!
@bogao007 - this PR is doing a lot. Could we please atleast a high level description of what files are being added, how they are being used and what features will work after this change is merged ? Thx
@bogao007 - this PR is doing a lot. Could we please atleast a high level description of what files are being added, how they are being used and what features will work after this change is merged ? Thx
Sure, will do.
@bogao007 - test failure seems related ?
[error] /home/runner/work/spark/spark/sql/core/src/main/scala/org/apache/spark/sql/execution/python/TransformWithStateInPandasExec.scala:51:12: class TransformWithStateInPandasExec needs to be abstract.
[error] Missing implementation for member of trait StatefulOperator:
[error] def validateAndMaybeEvolveStateSchema(hadoopConf: org.apache.hadoop.conf.Configuration, batchId: Long, stateSchemaVersion: Int): Array[String] = ???
[error] case class TransformWithStateInPandasExec(
@bogao007 - test failure seems related ?
[error] /home/runner/work/spark/spark/sql/core/src/main/scala/org/apache/spark/sql/execution/python/TransformWithStateInPandasExec.scala:51:12: class TransformWithStateInPandasExec needs to be abstract. [error] Missing implementation for member of trait StatefulOperator: [error] def validateAndMaybeEvolveStateSchema(hadoopConf: org.apache.hadoop.conf.Configuration, batchId: Long, stateSchemaVersion: Int): Array[String] = ??? [error] case class TransformWithStateInPandasExec(
Yeah, that's because I didn't merge the latest master into this PR. I will fix it, thanks!
@bogao007 - license error seems related ?
Could not find Apache license headers in the following files:
!????? /__w/spark/spark/python/pyspark/sql/streaming/StateMessage_pb2.py
!????? /__w/spark/spark/python/pyspark/sql/streaming/StateMessage_pb2.pyi
!????? /__w/spark/spark/sql/core/src/main/java/org/apache/spark/sql/execution/streaming/StateMessage.proto
!????? /__w/spark/spark/sql/core/src/main/java/org/apache/spark/sql/execution/streaming/state/StateMessage.java
@bogao007 - pyspark-core errors are also relevant ? could you PTAL ? thx
Hi @HyukjinKwon, I'm getting below testing errors with my PR:
======================================================================
ERROR [0.255s]: test_termination_sigterm (pyspark.tests.test_daemon.DaemonTests.test_termination_sigterm)
Ensure that daemon and workers terminate on SIGTERM.
----------------------------------------------------------------------
Traceback (most recent call last):
File "/__w/spark/spark/python/pyspark/tests/test_daemon.py", line 77, in test_termination_sigterm
self.do_termination_test(lambda daemon: os.kill(daemon.pid, SIGTERM))
File "/__w/spark/spark/python/pyspark/tests/test_daemon.py", line 49, in do_termination_test
port = read_int(daemon.stdout)
^^^^^^^^^^^^^^^^^^^^^^^
File "/__w/spark/spark/python/pyspark/serializers.py", line 597, in read_int
raise EOFError
EOFError
But if I revert changes in python/pyspark/sql/pandas/group_ops.py and python/pyspark/sql/streaming/__init__.py, it would succeed. Do you know if this is related to my newly added file stateful_processor.py? Not sure why this is related though.
The test failure doesn't look related to me. Can you reproduce it locally?
The test failure doesn't look related to me. Can you reproduce it locally?
Yeah I can reproduce it locally which is weird... And after I reverted the changes in python/pyspark/sql/pandas/group_ops.py and python/pyspark/sql/streaming/__init__.py, it succeeded. So I wonder if there's something wrong in file stateful_processor.py since both files have it as import.
@HyukjinKwon I got some other dependency errors for tests running in yarn and k8s
[info] - run Python application in yarn-client mode *** FAILED *** (4 seconds, 30 milliseconds)
[info] FAILED did not equal FINISHED WARNING: Using incubator modules: jdk.incubator.vector
[info] 23:36:43.052 WARN org.apache.hadoop.util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
[info]
[info] Traceback (most recent call last):
[info] File "/home/runner/work/spark/spark/target/tmp/spark-f738b923-9285-4c53-8678-bae00510c9ff/test.py", line 6, in <module>
[info] from pyspark import SparkConf , SparkContext
[info] File "<frozen zipimport>", line 259, in load_module
[info] File "/home/runner/work/spark/spark/python/lib/pyspark.zip/pyspark/__init__.py", line 129, in <module>
[info] File "<frozen zipimport>", line 259, in load_module
[info] File "/home/runner/work/spark/spark/python/lib/pyspark.zip/pyspark/sql/__init__.py", line 43, in <module>
[info] File "<frozen zipimport>", line 259, in load_module
[info] File "/home/runner/work/spark/spark/python/lib/pyspark.zip/pyspark/sql/context.py", line 37, in <module>
[info] File "<frozen zipimport>", line 259, in load_module
[info] File "/home/runner/work/spark/spark/python/lib/pyspark.zip/pyspark/sql/session.py", line 44, in <module>
[info] File "<frozen zipimport>", line 259, in load_module
[info] File "/home/runner/work/spark/spark/python/lib/pyspark.zip/pyspark/sql/dataframe.py", line 42, in <module>
[info] File "<frozen zipimport>", line 259, in load_module
[info] File "/home/runner/work/spark/spark/python/lib/pyspark.zip/pyspark/sql/streaming/__init__.py", line 21, in <module>
[info] File "<frozen zipimport>", line 259, in load_module
[info] File "/home/runner/work/spark/spark/python/lib/pyspark.zip/pyspark/sql/streaming/stateful_processor.py", line 21, in <module>
[info] File "<frozen zipimport>", line 259, in load_module
[info] File "/home/runner/work/spark/spark/python/lib/pyspark.zip/pyspark/sql/streaming/stateful_processor_api_client.py", line 23, in <module>
[info] File "<frozen zipimport>", line 259, in load_module
[info] File "/home/runner/work/spark/spark/python/lib/pyspark.zip/pyspark/sql/streaming/StateMessage_pb2.py", line 23, in <module>
[info] ModuleNotFoundError: No module named 'google'
Do you know where I can specify the new dependency I added? I found this blog but might not be related to unit test failure?
@HyukjinKwon I fixed the dependency issue, could you help take another look? Thanks!
Let's get this reviewed by @hvanhovell @grundprinzip @zhengruifeng and @ueshin too
@bogao007 which exception do we get now if protobuf dependency is not installed?
@HyukjinKwon could you help take another look? Thanks!
Looks fine at high level
Thanks @HyukjinKwon! I addressed your comments, could you help take another look?
I defer to @HeartSaVioR . I don;t have any high level concern
Let's call this out as transformWithState explicitly as now we finalize the name of the API.
I'm going to merge as we have TODO tickets and all others look OK.
Thanks! Merging to master.
+1
Please leave a comment listing all JIRA tickets for TODOs, for record/reference.
Thanks a lot @HeartSaVioR! Here are the TODOs related to this PR: https://issues.apache.org/jira/browse/SPARK-49233 https://issues.apache.org/jira/browse/SPARK-49100 https://issues.apache.org/jira/browse/SPARK-49212