spark icon indicating copy to clipboard operation
spark copied to clipboard

[SPARK-48755] State V2 base implementation and ValueState support

Open bogao007 opened this issue 1 year ago • 2 comments

What changes were proposed in this pull request?

  • Base implementation for Python State V2
  • Implemented ValueState

Why are the changes needed?

Support Python State V2 API

Does this PR introduce any user-facing change?

Yes

How was this patch tested?

Did local integration test with below command

import pandas as pd
from pyspark.sql.streaming import StatefulProcessor, StatefulProcessorHandle
from pyspark.sql.types import StructType, StructField, LongType, StringType
from typing import Iterator
spark.conf.set("spark.sql.streaming.stateStore.providerClass","org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreProvider")
spark.conf.set("spark.sql.shuffle.partitions","1")
output_schema = StructType([
    StructField("value", LongType(), True)
])
state_schema = StructType([
    StructField("value", StringType(), True)
])

class SimpleStatefulProcessor(StatefulProcessor):
  def init(self, handle: StatefulProcessorHandle) -> None:
    self.value_state = handle.getValueState("testValueState", state_schema)
  def handleInputRows(self, key, rows) -> Iterator[pd.DataFrame]:
    self.value_state.update("test_value")
    exists = self.value_state.exists()
    print(f"value state exists: {exists}")
    value = self.value_state.get()
    print(f"get value: {value}")
    print("clearing value state")
    self.value_state.clear()
    print("value state cleared")
    return rows
  def close(self) -> None:
    pass

q = spark.readStream.format("rate").option("rowsPerSecond", "1").option("numPartitions", "1").load().groupBy("value").transformWithStateInPandas(stateful_processor = SimpleStatefulProcessor(), outputStructType=output_schema, outputMode="Update", timeMode="None").writeStream.format("console").option("checkpointLocation", "/tmp/streaming/temp_ckp").outputMode("update").start()

Verified from the logs that value state methods work as expected for key 11

handling input rows for key: 11
setting implicit key: 11
sending message -- len = 8 b'"\x06\n\x04\n\x0211'
24/06/27 15:21:40 WARN TransformWithStateInPandasStateServer: version = 0
24/06/27 15:21:40 WARN TransformWithStateInPandasStateServer: parsing a message of 8 bytes
24/06/27 15:21:40 WARN TransformWithStateInPandasStateServer: read bytes = Array(34, 6, 10, 4, 10, 2, 49, 49)
24/06/27 15:21:40 WARN TransformWithStateInPandasStateServer: read message = implicitGroupingKeyRequest {
  setImplicitKey {
    key: "11"
  }
}

24/06/27 15:21:40 WARN TransformWithStateInPandasStateServer: setting implicit key to 11 with type class java.lang.Long
24/06/27 15:21:40 WARN TransformWithStateInPandasStateServer: flush output stream
24/06/27 15:21:40 WARN TransformWithStateInPandasStateServer: reading the version
setImplicitKey status= 0
updating value state: testValueState
sending message -- len = 127 b'\x1a}\n{\x1ay\n\x0etestValueState\x12[{"fields":[{"metadata":{},"name":"value","nullable":true,"type":"string"}],"type":"struct"}\x1a\ntest_value'
24/06/27 15:21:40 WARN TransformWithStateInPandasStateServer: version = 0
24/06/27 15:21:40 WARN TransformWithStateInPandasStateServer: parsing a message of 127 bytes
24/06/27 15:21:40 WARN TransformWithStateInPandasStateServer: read bytes = Array(26, 125, 10, 123, 26, 121, 10, 14, 116, 101, 115, 116, 86, 97, 108, 117, 101, 83, 116, 97, 116, 101, 18, 91, 123, 34, 102, 105, 101, 108, 100, 115, 34, 58, 91, 123, 34, 109, 101, 116, 97, 100, 97, 116, 97, 34, 58, 123, 125, 44, 34, 110, 97, 109, 101, 34, 58, 34, 118, 97, 108, 117, 101, 34, 44, 34, 110, 117, 108, 108, 97, 98, 108, 101, 34, 58, 116, 114, 117, 101, 44, 34, 116, 121, 112, 101, 34, 58, 34, 115, 116, 114, 105, 110, 103, 34, 125, 93, 44, 34, 116, 121, 112, 101, 34, 58, 34, 115, 116, 114, 117, 99, 116, 34, 125, 26, 10, 116, 101, 115, 116, 95, 118, 97, 108, 117, 101)
24/06/27 15:21:40 WARN TransformWithStateInPandasStateServer: read message = stateVariableRequest {
  valueStateCall {
    update {
      stateName: "testValueState"
      schema: "{\"fields\":[{\"metadata\":{},\"name\":\"value\",\"nullable\":true,\"type\":\"string\"}],\"type\":\"struct\"}"
      value: "test_value"
    }
  }
}

24/06/27 15:21:40 WARN TransformWithStateInPandasStateServer: updating state testValueState with value test_value and type class java.lang.String
24/06/27 15:21:40 WARN StateTypesEncoder: Serializing grouping key: [11]
24/06/27 15:21:40 WARN TransformWithStateInPandasStateServer: flush output stream
24/06/27 15:21:40 WARN TransformWithStateInPandasStateServer: reading the version
valueStateUpdate status= 0
checking value state exists: testValueState
sending message -- len = 22 b'\x1a\x14\n\x12\n\x10\n\x0etestValueState'
24/06/27 15:21:40 WARN TransformWithStateInPandasStateServer: version = 0
24/06/27 15:21:40 WARN TransformWithStateInPandasStateServer: parsing a message of 22 bytes
24/06/27 15:21:40 WARN TransformWithStateInPandasStateServer: read bytes = Array(26, 20, 10, 18, 10, 16, 10, 14, 116, 101, 115, 116, 86, 97, 108, 117, 101, 83, 116, 97, 116, 101)
24/06/27 15:21:40 WARN TransformWithStateInPandasStateServer: read message = stateVariableRequest {
  valueStateCall {
    exists {
      stateName: "testValueState"
    }
  }
}

24/06/27 15:21:40 WARN TransformWithStateInPandasStateServer: state testValueState exists
24/06/27 15:21:40 WARN TransformWithStateInPandasStateServer: flush output stream
24/06/27 15:21:40 WARN TransformWithStateInPandasStateServer: reading the version
valueStateExists status= 0
value state exists: True
getting value state: testValueState
sending message -- len = 22 b'\x1a\x14\n\x12\x12\x10\n\x0etestValueState'
24/06/27 15:21:40 WARN TransformWithStateInPandasStateServer: version = 0
24/06/27 15:21:40 WARN TransformWithStateInPandasStateServer: parsing a message of 22 bytes
24/06/27 15:21:40 WARN TransformWithStateInPandasStateServer: read bytes = Array(26, 20, 10, 18, 18, 16, 10, 14, 116, 101, 115, 116, 86, 97, 108, 117, 101, 83, 116, 97, 116, 101)
24/06/27 15:21:40 WARN TransformWithStateInPandasStateServer: read message = stateVariableRequest {
  valueStateCall {
    get {
      stateName: "testValueState"
    }
  }
}

24/06/27 15:21:40 WARN StateTypesEncoder: Serializing grouping key: [11]
24/06/27 15:21:40 WARN TransformWithStateInPandasStateServer: got state value test_value
24/06/27 15:21:40 WARN TransformWithStateInPandasStateServer: writing value bytes of length 10
24/06/27 15:21:40 WARN TransformWithStateInPandasStateServer: writing value bytes: Array(116, 101, 115, 116, 95, 118, 97, 108, 117, 101)
24/06/27 15:21:40 WARN TransformWithStateInPandasStateServer: flush output stream
24/06/27 15:21:40 WARN TransformWithStateInPandasStateServer: reading the version
valueStateGet status= 0
get value: test_value
clearing value state
clearing value state: testValueState
sending message -- len = 22 b'\x1a\x14\n\x12"\x10\n\x0etestValueState'
24/06/27 15:21:40 WARN TransformWithStateInPandasStateServer: version = 0
24/06/27 15:21:40 WARN TransformWithStateInPandasStateServer: parsing a message of 22 bytes
24/06/27 15:21:40 WARN TransformWithStateInPandasStateServer: read bytes = Array(26, 20, 10, 18, 34, 16, 10, 14, 116, 101, 115, 116, 86, 97, 108, 117, 101, 83, 116, 97, 116, 101)
24/06/27 15:21:40 WARN TransformWithStateInPandasStateServer: read message = stateVariableRequest {
  valueStateCall {
    clear {
      stateName: "testValueState"
    }
  }
}

24/06/27 15:21:40 WARN StateTypesEncoder: Serializing grouping key: [11]
24/06/27 15:21:40 WARN TransformWithStateInPandasStateServer: flush output stream
24/06/27 15:21:40 WARN TransformWithStateInPandasStateServer: reading the version
valueStateClear status= 0
value state cleared
removing implicit key
sending message -- len = 4 b'"\x02\x12\x00'
24/06/27 15:21:40 WARN TransformWithStateInPandasStateServer: version = 0
24/06/27 15:21:40 WARN TransformWithStateInPandasStateServer: parsing a message of 4 bytes
24/06/27 15:21:40 WARN TransformWithStateInPandasStateServer: read bytes = Array(34, 2, 18, 0)
24/06/27 15:21:40 WARN TransformWithStateInPandasStateServer: read message = implicitGroupingKeyRequest {
  removeImplicitKey {
  }
}

24/06/27 15:21:40 WARN TransformWithStateInPandasStateServer: removing implicit key
24/06/27 15:21:40 WARN TransformWithStateInPandasStateServer: removed implicit key
24/06/27 15:21:40 WARN TransformWithStateInPandasStateServer: flush output stream
removeImplicitKey status= 0

Will add unit test

Was this patch authored or co-authored using generative AI tooling?

No

bogao007 avatar Jun 27 '24 22:06 bogao007

Mind filing a JIRA?

HyukjinKwon avatar Jun 28 '24 02:06 HyukjinKwon

Mind filing a JIRA?

Yeah, will do, thanks!

bogao007 avatar Jun 28 '24 02:06 bogao007

@bogao007 - this PR is doing a lot. Could we please atleast a high level description of what files are being added, how they are being used and what features will work after this change is merged ? Thx

anishshri-db avatar Jul 16 '24 20:07 anishshri-db

@bogao007 - this PR is doing a lot. Could we please atleast a high level description of what files are being added, how they are being used and what features will work after this change is merged ? Thx

Sure, will do.

bogao007 avatar Jul 16 '24 20:07 bogao007

@bogao007 - test failure seems related ?

[error] /home/runner/work/spark/spark/sql/core/src/main/scala/org/apache/spark/sql/execution/python/TransformWithStateInPandasExec.scala:51:12: class TransformWithStateInPandasExec needs to be abstract.
[error] Missing implementation for member of trait StatefulOperator:
[error]   def validateAndMaybeEvolveStateSchema(hadoopConf: org.apache.hadoop.conf.Configuration, batchId: Long, stateSchemaVersion: Int): Array[String] = ???
[error] case class TransformWithStateInPandasExec(

anishshri-db avatar Jul 16 '24 21:07 anishshri-db

@bogao007 - test failure seems related ?

[error] /home/runner/work/spark/spark/sql/core/src/main/scala/org/apache/spark/sql/execution/python/TransformWithStateInPandasExec.scala:51:12: class TransformWithStateInPandasExec needs to be abstract.
[error] Missing implementation for member of trait StatefulOperator:
[error]   def validateAndMaybeEvolveStateSchema(hadoopConf: org.apache.hadoop.conf.Configuration, batchId: Long, stateSchemaVersion: Int): Array[String] = ???
[error] case class TransformWithStateInPandasExec(

Yeah, that's because I didn't merge the latest master into this PR. I will fix it, thanks!

bogao007 avatar Jul 16 '24 21:07 bogao007

@bogao007 - license error seems related ?

Could not find Apache license headers in the following files:
 !????? /__w/spark/spark/python/pyspark/sql/streaming/StateMessage_pb2.py
 !????? /__w/spark/spark/python/pyspark/sql/streaming/StateMessage_pb2.pyi
 !????? /__w/spark/spark/sql/core/src/main/java/org/apache/spark/sql/execution/streaming/StateMessage.proto
 !????? /__w/spark/spark/sql/core/src/main/java/org/apache/spark/sql/execution/streaming/state/StateMessage.java

anishshri-db avatar Jul 25 '24 05:07 anishshri-db

@bogao007 - pyspark-core errors are also relevant ? could you PTAL ? thx

anishshri-db avatar Jul 25 '24 05:07 anishshri-db

Hi @HyukjinKwon, I'm getting below testing errors with my PR:

======================================================================
ERROR [0.255s]: test_termination_sigterm (pyspark.tests.test_daemon.DaemonTests.test_termination_sigterm)
Ensure that daemon and workers terminate on SIGTERM.
----------------------------------------------------------------------
Traceback (most recent call last):
  File "/__w/spark/spark/python/pyspark/tests/test_daemon.py", line 77, in test_termination_sigterm
    self.do_termination_test(lambda daemon: os.kill(daemon.pid, SIGTERM))
  File "/__w/spark/spark/python/pyspark/tests/test_daemon.py", line 49, in do_termination_test
    port = read_int(daemon.stdout)
           ^^^^^^^^^^^^^^^^^^^^^^^
  File "/__w/spark/spark/python/pyspark/serializers.py", line 597, in read_int
    raise EOFError
EOFError

But if I revert changes in python/pyspark/sql/pandas/group_ops.py and python/pyspark/sql/streaming/__init__.py, it would succeed. Do you know if this is related to my newly added file stateful_processor.py? Not sure why this is related though.

bogao007 avatar Jul 25 '24 22:07 bogao007

The test failure doesn't look related to me. Can you reproduce it locally?

HyukjinKwon avatar Jul 26 '24 01:07 HyukjinKwon

The test failure doesn't look related to me. Can you reproduce it locally?

Yeah I can reproduce it locally which is weird... And after I reverted the changes in python/pyspark/sql/pandas/group_ops.py and python/pyspark/sql/streaming/__init__.py, it succeeded. So I wonder if there's something wrong in file stateful_processor.py since both files have it as import.

bogao007 avatar Jul 26 '24 04:07 bogao007

@HyukjinKwon I got some other dependency errors for tests running in yarn and k8s

[info] - run Python application in yarn-client mode *** FAILED *** (4 seconds, 30 milliseconds)
[info]   FAILED did not equal FINISHED WARNING: Using incubator modules: jdk.incubator.vector
[info]   23:36:43.052 WARN org.apache.hadoop.util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
[info]   
[info]   Traceback (most recent call last):
[info]     File "/home/runner/work/spark/spark/target/tmp/spark-f738b923-9285-4c53-8678-bae00510c9ff/test.py", line 6, in <module>
[info]       from pyspark import SparkConf , SparkContext
[info]     File "<frozen zipimport>", line 259, in load_module
[info]     File "/home/runner/work/spark/spark/python/lib/pyspark.zip/pyspark/__init__.py", line 129, in <module>
[info]     File "<frozen zipimport>", line 259, in load_module
[info]     File "/home/runner/work/spark/spark/python/lib/pyspark.zip/pyspark/sql/__init__.py", line 43, in <module>
[info]     File "<frozen zipimport>", line 259, in load_module
[info]     File "/home/runner/work/spark/spark/python/lib/pyspark.zip/pyspark/sql/context.py", line 37, in <module>
[info]     File "<frozen zipimport>", line 259, in load_module
[info]     File "/home/runner/work/spark/spark/python/lib/pyspark.zip/pyspark/sql/session.py", line 44, in <module>
[info]     File "<frozen zipimport>", line 259, in load_module
[info]     File "/home/runner/work/spark/spark/python/lib/pyspark.zip/pyspark/sql/dataframe.py", line 42, in <module>
[info]     File "<frozen zipimport>", line 259, in load_module
[info]     File "/home/runner/work/spark/spark/python/lib/pyspark.zip/pyspark/sql/streaming/__init__.py", line 21, in <module>
[info]     File "<frozen zipimport>", line 259, in load_module
[info]     File "/home/runner/work/spark/spark/python/lib/pyspark.zip/pyspark/sql/streaming/stateful_processor.py", line 21, in <module>
[info]     File "<frozen zipimport>", line 259, in load_module
[info]     File "/home/runner/work/spark/spark/python/lib/pyspark.zip/pyspark/sql/streaming/stateful_processor_api_client.py", line 23, in <module>
[info]     File "<frozen zipimport>", line 259, in load_module
[info]     File "/home/runner/work/spark/spark/python/lib/pyspark.zip/pyspark/sql/streaming/StateMessage_pb2.py", line 23, in <module>
[info]   ModuleNotFoundError: No module named 'google'

Do you know where I can specify the new dependency I added? I found this blog but might not be related to unit test failure?

bogao007 avatar Jul 26 '24 21:07 bogao007

@HyukjinKwon I fixed the dependency issue, could you help take another look? Thanks!

bogao007 avatar Jul 31 '24 00:07 bogao007

Let's get this reviewed by @hvanhovell @grundprinzip @zhengruifeng and @ueshin too

HyukjinKwon avatar Aug 02 '24 03:08 HyukjinKwon

@bogao007 which exception do we get now if protobuf dependency is not installed?

HyukjinKwon avatar Aug 02 '24 03:08 HyukjinKwon

@HyukjinKwon could you help take another look? Thanks!

bogao007 avatar Aug 10 '24 00:08 bogao007

Looks fine at high level

Thanks @HyukjinKwon! I addressed your comments, could you help take another look?

bogao007 avatar Aug 12 '24 00:08 bogao007

I defer to @HeartSaVioR . I don;t have any high level concern

HyukjinKwon avatar Aug 12 '24 02:08 HyukjinKwon

Let's call this out as transformWithState explicitly as now we finalize the name of the API.

HeartSaVioR avatar Aug 12 '24 13:08 HeartSaVioR

I'm going to merge as we have TODO tickets and all others look OK.

Thanks! Merging to master.

HeartSaVioR avatar Aug 15 '24 13:08 HeartSaVioR

+1

Please leave a comment listing all JIRA tickets for TODOs, for record/reference.

Thanks a lot @HeartSaVioR! Here are the TODOs related to this PR: https://issues.apache.org/jira/browse/SPARK-49233 https://issues.apache.org/jira/browse/SPARK-49100 https://issues.apache.org/jira/browse/SPARK-49212

bogao007 avatar Aug 15 '24 17:08 bogao007