Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
55 commits
Select commit Hold shift + click to select a range
b1175e4
[WIP] Poc.
sahnib Jan 22, 2024
0a98ed8
Introduce Protobuf.
sahnib Feb 6, 2024
8e2b193
Fixing things.
sahnib Feb 6, 2024
16e4c17
support timeMode for python state v2 API
bogao007 Jun 20, 2024
92ef716
Add protobuf for serde
sahnib Feb 13, 2024
c3eaf38
protobuf change
bogao007 Jun 20, 2024
609d94e
Initial commit
bogao007 Jun 27, 2024
a27f9d9
better error handling, support value state with different types
bogao007 Jun 28, 2024
684939b
addressed comments
bogao007 Jul 3, 2024
7f65fbd
fix
bogao007 Jul 3, 2024
c25d7da
Added support for unix domain socket
bogao007 Jul 11, 2024
9c8c616
removed unrelated log lines, addressed part of the comments
bogao007 Jul 17, 2024
c641192
fix
bogao007 Jul 17, 2024
8d3da4e
Addressed comments
bogao007 Jul 19, 2024
cc9bf95
removed unnecessary print
bogao007 Jul 19, 2024
f7df2dc
rename
bogao007 Jul 19, 2024
27cd169
fix
bogao007 Jul 19, 2024
3b5b3e5
removed duplicate proto file
bogao007 Jul 20, 2024
5d910d8
revert unrelated changes
bogao007 Jul 20, 2024
df859ab
fix
bogao007 Jul 20, 2024
654f2f6
Added unit tests for transformWithStateInPandas
bogao007 Jul 24, 2024
38832a6
Merge branch 'master' into state-v2-initial
bogao007 Jul 24, 2024
0585ac0
fix and rename
bogao007 Jul 24, 2024
0ee5029
update test
bogao007 Jul 24, 2024
6232c81
Added lisences
bogao007 Jul 25, 2024
41f8234
fixed format issues
bogao007 Jul 25, 2024
d57633f
fix
bogao007 Jul 25, 2024
df9ea9e
fix format
bogao007 Jul 25, 2024
68f7a7e
doc
bogao007 Jul 25, 2024
ca5216b
addressed comments
bogao007 Jul 26, 2024
c9e3a7c
structured log
bogao007 Jul 26, 2024
2320805
suppress auto generated proto file
bogao007 Jul 29, 2024
6e5de2e
fix linter
bogao007 Jul 29, 2024
200ec5e
fixed dependency issue
bogao007 Jul 29, 2024
dd3e46b
make protobuf as local dependency
bogao007 Jul 30, 2024
e8360d4
fix dependency issue
bogao007 Jul 30, 2024
82983af
fix
bogao007 Jul 30, 2024
49dbc16
fix lint
bogao007 Jul 30, 2024
d4e04ea
fix
bogao007 Jul 30, 2024
e108f60
updated fix
bogao007 Jul 30, 2024
bae26c2
reformat
bogao007 Jul 30, 2024
d96fa9e
addressed comments
bogao007 Jul 31, 2024
92531db
fix linter
bogao007 Jul 31, 2024
d507793
linter
bogao007 Jul 31, 2024
5dcb4c8
addressed comments
bogao007 Aug 2, 2024
37be02a
address comment
bogao007 Aug 2, 2024
f63687f
addressed comments
bogao007 Aug 9, 2024
263c087
Merge branch 'master' into state-v2-initial
bogao007 Aug 10, 2024
c7b0a4f
address comments
bogao007 Aug 12, 2024
c80b292
address comments
bogao007 Aug 12, 2024
81276f3
address comments
bogao007 Aug 14, 2024
5886b5c
fix lint
bogao007 Aug 14, 2024
23e54b4
fix lint
bogao007 Aug 14, 2024
2ba4fd0
address comments
bogao007 Aug 14, 2024
2a9c20b
fix test
bogao007 Aug 14, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
fix lint
  • Loading branch information
bogao007 committed Aug 14, 2024
commit 5886b5c752db2a0bb3981b4da77cf333fa4d64e8
66 changes: 34 additions & 32 deletions python/pyspark/sql/streaming/StateMessage_pb2.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,41 +30,43 @@
_sym_db = _symbol_database.Default()


DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x12StateMessage.proto\x12.org.apache.spark.sql.execution.streaming.state\"\xe9\x02\n\x0cStateRequest\x12\x0f\n\x07version\x18\x01 \x01(\x05\x12\x66\n\x15statefulProcessorCall\x18\x02 \x01(\x0b\x32\x45.org.apache.spark.sql.execution.streaming.state.StatefulProcessorCallH\x00\x12\x64\n\x14stateVariableRequest\x18\x03 \x01(\x0b\x32\x44.org.apache.spark.sql.execution.streaming.state.StateVariableRequestH\x00\x12p\n\x1aimplicitGroupingKeyRequest\x18\x04 \x01(\x0b\x32J.org.apache.spark.sql.execution.streaming.state.ImplicitGroupingKeyRequestH\x00\x42\x08\n\x06method\"H\n\rStateResponse\x12\x12\n\nstatusCode\x18\x01 \x01(\x05\x12\x14\n\x0c\x65rrorMessage\x18\x02 \x01(\t\x12\r\n\x05value\x18\x03 \x01(\x0c\"\x89\x03\n\x15StatefulProcessorCall\x12X\n\x0esetHandleState\x18\x01 \x01(\x0b\x32>.org.apache.spark.sql.execution.streaming.state.SetHandleStateH\x00\x12Y\n\rgetValueState\x18\x02 \x01(\x0b\x32@.org.apache.spark.sql.execution.streaming.state.StateCallCommandH\x00\x12X\n\x0cgetListState\x18\x03 \x01(\x0b\x32@.org.apache.spark.sql.execution.streaming.state.StateCallCommandH\x00\x12W\n\x0bgetMapState\x18\x04 \x01(\x0b\x32@.org.apache.spark.sql.execution.streaming.state.StateCallCommandH\x00\x42\x08\n\x06method\"z\n\x14StateVariableRequest\x12X\n\x0evalueStateCall\x18\x01 \x01(\x0b\x32>.org.apache.spark.sql.execution.streaming.state.ValueStateCallH\x00\x42\x08\n\x06method\"\xe0\x01\n\x1aImplicitGroupingKeyRequest\x12X\n\x0esetImplicitKey\x18\x01 \x01(\x0b\x32>.org.apache.spark.sql.execution.streaming.state.SetImplicitKeyH\x00\x12^\n\x11removeImplicitKey\x18\x02 \x01(\x0b\x32\x41.org.apache.spark.sql.execution.streaming.state.RemoveImplicitKeyH\x00\x42\x08\n\x06method\"5\n\x10StateCallCommand\x12\x11\n\tstateName\x18\x01 \x01(\t\x12\x0e\n\x06schema\x18\x02 \x01(\t\"\xe1\x02\n\x0eValueStateCall\x12\x11\n\tstateName\x18\x01 \x01(\t\x12H\n\x06\x65xists\x18\x02 \x01(\x0b\x32\x36.org.apache.spark.sql.execution.streaming.state.ExistsH\x00\x12\x42\n\x03get\x18\x03 \x01(\x0b\x32\x33.org.apache.spark.sql.execution.streaming.state.GetH\x00\x12\\\n\x10valueStateUpdate\x18\x04 \x01(\x0b\x32@.org.apache.spark.sql.execution.streaming.state.ValueStateUpdateH\x00\x12\x46\n\x05\x63lear\x18\x05 \x01(\x0b\x32\x35.org.apache.spark.sql.execution.streaming.state.ClearH\x00\x42\x08\n\x06method\"\x1d\n\x0eSetImplicitKey\x12\x0b\n\x03key\x18\x01 \x01(\x0c\"\x13\n\x11RemoveImplicitKey\"\x08\n\x06\x45xists\"\x05\n\x03Get\"!\n\x10ValueStateUpdate\x12\r\n\x05value\x18\x01 \x01(\x0c\"\x07\n\x05\x43lear\"\\\n\x0eSetHandleState\x12J\n\x05state\x18\x01 \x01(\x0e\x32;.org.apache.spark.sql.execution.streaming.state.HandleState*K\n\x0bHandleState\x12\x0b\n\x07\x43REATED\x10\x00\x12\x0f\n\x0bINITIALIZED\x10\x01\x12\x12\n\x0e\x44\x41TA_PROCESSED\x10\x02\x12\n\n\x06\x43LOSED\x10\x03\x62\x06proto3')
DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(
b'\n\x12StateMessage.proto\x12.org.apache.spark.sql.execution.streaming.state"\xe9\x02\n\x0cStateRequest\x12\x0f\n\x07version\x18\x01 \x01(\x05\x12\x66\n\x15statefulProcessorCall\x18\x02 \x01(\x0b\x32\x45.org.apache.spark.sql.execution.streaming.state.StatefulProcessorCallH\x00\x12\x64\n\x14stateVariableRequest\x18\x03 \x01(\x0b\x32\x44.org.apache.spark.sql.execution.streaming.state.StateVariableRequestH\x00\x12p\n\x1aimplicitGroupingKeyRequest\x18\x04 \x01(\x0b\x32J.org.apache.spark.sql.execution.streaming.state.ImplicitGroupingKeyRequestH\x00\x42\x08\n\x06method"H\n\rStateResponse\x12\x12\n\nstatusCode\x18\x01 \x01(\x05\x12\x14\n\x0c\x65rrorMessage\x18\x02 \x01(\t\x12\r\n\x05value\x18\x03 \x01(\x0c"\x89\x03\n\x15StatefulProcessorCall\x12X\n\x0esetHandleState\x18\x01 \x01(\x0b\x32>.org.apache.spark.sql.execution.streaming.state.SetHandleStateH\x00\x12Y\n\rgetValueState\x18\x02 \x01(\x0b\x32@.org.apache.spark.sql.execution.streaming.state.StateCallCommandH\x00\x12X\n\x0cgetListState\x18\x03 \x01(\x0b\x32@.org.apache.spark.sql.execution.streaming.state.StateCallCommandH\x00\x12W\n\x0bgetMapState\x18\x04 \x01(\x0b\x32@.org.apache.spark.sql.execution.streaming.state.StateCallCommandH\x00\x42\x08\n\x06method"z\n\x14StateVariableRequest\x12X\n\x0evalueStateCall\x18\x01 \x01(\x0b\x32>.org.apache.spark.sql.execution.streaming.state.ValueStateCallH\x00\x42\x08\n\x06method"\xe0\x01\n\x1aImplicitGroupingKeyRequest\x12X\n\x0esetImplicitKey\x18\x01 \x01(\x0b\x32>.org.apache.spark.sql.execution.streaming.state.SetImplicitKeyH\x00\x12^\n\x11removeImplicitKey\x18\x02 \x01(\x0b\x32\x41.org.apache.spark.sql.execution.streaming.state.RemoveImplicitKeyH\x00\x42\x08\n\x06method"5\n\x10StateCallCommand\x12\x11\n\tstateName\x18\x01 \x01(\t\x12\x0e\n\x06schema\x18\x02 \x01(\t"\xe1\x02\n\x0eValueStateCall\x12\x11\n\tstateName\x18\x01 \x01(\t\x12H\n\x06\x65xists\x18\x02 \x01(\x0b\x32\x36.org.apache.spark.sql.execution.streaming.state.ExistsH\x00\x12\x42\n\x03get\x18\x03 \x01(\x0b\x32\x33.org.apache.spark.sql.execution.streaming.state.GetH\x00\x12\\\n\x10valueStateUpdate\x18\x04 \x01(\x0b\x32@.org.apache.spark.sql.execution.streaming.state.ValueStateUpdateH\x00\x12\x46\n\x05\x63lear\x18\x05 \x01(\x0b\x32\x35.org.apache.spark.sql.execution.streaming.state.ClearH\x00\x42\x08\n\x06method"\x1d\n\x0eSetImplicitKey\x12\x0b\n\x03key\x18\x01 \x01(\x0c"\x13\n\x11RemoveImplicitKey"\x08\n\x06\x45xists"\x05\n\x03Get"!\n\x10ValueStateUpdate\x12\r\n\x05value\x18\x01 \x01(\x0c"\x07\n\x05\x43lear"\\\n\x0eSetHandleState\x12J\n\x05state\x18\x01 \x01(\x0e\x32;.org.apache.spark.sql.execution.streaming.state.HandleState*K\n\x0bHandleState\x12\x0b\n\x07\x43REATED\x10\x00\x12\x0f\n\x0bINITIALIZED\x10\x01\x12\x12\n\x0e\x44\x41TA_PROCESSED\x10\x02\x12\n\n\x06\x43LOSED\x10\x03\x62\x06proto3' # noqa: E501
)

_globals = globals()
_builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, _globals)
_builder.BuildTopDescriptorsAndMessages(DESCRIPTOR, "StateMessage_pb2", _globals)
if not _descriptor._USE_C_DESCRIPTORS:
DESCRIPTOR._loaded_options = None
_globals['_HANDLESTATE']._serialized_start=1873
_globals['_HANDLESTATE']._serialized_end=1948
_globals['_STATEREQUEST']._serialized_start=71
_globals['_STATEREQUEST']._serialized_end=432
_globals['_STATERESPONSE']._serialized_start=434
_globals['_STATERESPONSE']._serialized_end=506
_globals['_STATEFULPROCESSORCALL']._serialized_start=509
_globals['_STATEFULPROCESSORCALL']._serialized_end=902
_globals['_STATEVARIABLEREQUEST']._serialized_start=904
_globals['_STATEVARIABLEREQUEST']._serialized_end=1026
_globals['_IMPLICITGROUPINGKEYREQUEST']._serialized_start=1029
_globals['_IMPLICITGROUPINGKEYREQUEST']._serialized_end=1253
_globals['_STATECALLCOMMAND']._serialized_start=1255
_globals['_STATECALLCOMMAND']._serialized_end=1308
_globals['_VALUESTATECALL']._serialized_start=1311
_globals['_VALUESTATECALL']._serialized_end=1664
_globals['_SETIMPLICITKEY']._serialized_start=1666
_globals['_SETIMPLICITKEY']._serialized_end=1695
_globals['_REMOVEIMPLICITKEY']._serialized_start=1697
_globals['_REMOVEIMPLICITKEY']._serialized_end=1716
_globals['_EXISTS']._serialized_start=1718
_globals['_EXISTS']._serialized_end=1726
_globals['_GET']._serialized_start=1728
_globals['_GET']._serialized_end=1733
_globals['_VALUESTATEUPDATE']._serialized_start=1735
_globals['_VALUESTATEUPDATE']._serialized_end=1768
_globals['_CLEAR']._serialized_start=1770
_globals['_CLEAR']._serialized_end=1777
_globals['_SETHANDLESTATE']._serialized_start=1779
_globals['_SETHANDLESTATE']._serialized_end=1871
DESCRIPTOR._loaded_options = None
_globals["_HANDLESTATE"]._serialized_start = 1873
_globals["_HANDLESTATE"]._serialized_end = 1948
_globals["_STATEREQUEST"]._serialized_start = 71
_globals["_STATEREQUEST"]._serialized_end = 432
_globals["_STATERESPONSE"]._serialized_start = 434
_globals["_STATERESPONSE"]._serialized_end = 506
_globals["_STATEFULPROCESSORCALL"]._serialized_start = 509
_globals["_STATEFULPROCESSORCALL"]._serialized_end = 902
_globals["_STATEVARIABLEREQUEST"]._serialized_start = 904
_globals["_STATEVARIABLEREQUEST"]._serialized_end = 1026
_globals["_IMPLICITGROUPINGKEYREQUEST"]._serialized_start = 1029
_globals["_IMPLICITGROUPINGKEYREQUEST"]._serialized_end = 1253
_globals["_STATECALLCOMMAND"]._serialized_start = 1255
_globals["_STATECALLCOMMAND"]._serialized_end = 1308
_globals["_VALUESTATECALL"]._serialized_start = 1311
_globals["_VALUESTATECALL"]._serialized_end = 1664
_globals["_SETIMPLICITKEY"]._serialized_start = 1666
_globals["_SETIMPLICITKEY"]._serialized_end = 1695
_globals["_REMOVEIMPLICITKEY"]._serialized_start = 1697
_globals["_REMOVEIMPLICITKEY"]._serialized_end = 1716
_globals["_EXISTS"]._serialized_start = 1718
_globals["_EXISTS"]._serialized_end = 1726
_globals["_GET"]._serialized_start = 1728
_globals["_GET"]._serialized_end = 1733
_globals["_VALUESTATEUPDATE"]._serialized_start = 1735
_globals["_VALUESTATEUPDATE"]._serialized_end = 1768
_globals["_CLEAR"]._serialized_start = 1770
_globals["_CLEAR"]._serialized_end = 1777
_globals["_SETHANDLESTATE"]._serialized_start = 1779
_globals["_SETHANDLESTATE"]._serialized_end = 1871
# @@protoc_insertion_point(module_scope)
1 change: 1 addition & 0 deletions python/pyspark/sql/streaming/StateMessage_pb2.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ class HandleState(int, metaclass=_enum_type_wrapper.EnumTypeWrapper):
INITIALIZED: _ClassVar[HandleState]
DATA_PROCESSED: _ClassVar[HandleState]
CLOSED: _ClassVar[HandleState]

CREATED: HandleState
INITIALIZED: HandleState
DATA_PROCESSED: HandleState
Expand Down
5 changes: 2 additions & 3 deletions python/pyspark/sql/streaming/stateful_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
from abc import ABC, abstractmethod
from typing import Any, TYPE_CHECKING, Iterator, Union, cast

from pyspark.sql import Row
from pyspark.sql.streaming.stateful_processor_api_client import StatefulProcessorApiClient
from pyspark.sql.streaming.value_state_client import ValueStateClient
from pyspark.sql.types import StructType, _create_row, _parse_datatype_string
Expand Down Expand Up @@ -50,7 +49,7 @@ def exists(self) -> bool:
"""
return self._value_state_client.exists(self._state_name)

def get(self) -> Row:
def get(self) -> Any:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Again, we expect Row as state value, not a pandas DataFrame. Please let me know if you are proposing pandas DataFrame for better suit for more state types.

"""
Get the state value if it exists. Returns None if the state variable does not have a value.
"""
Expand All @@ -61,7 +60,7 @@ def get(self) -> Row:
if isinstance(schema, str):
schema = cast(StructType, _parse_datatype_string(schema))
# Create the Row using the values and schema fields
row = _create_row(schema.fields, value)
row = _create_row(schema.fieldNames(), value)
return row

def update(self, new_value: Any) -> None:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
from pyspark.sql.types import StructType, _parse_datatype_string, Row
from pyspark.sql.utils import has_numpy
from pyspark.serializers import CPickleSerializer
from pyspark.errors import PySparkAttributeError, PySparkRuntimeError
from pyspark.errors import PySparkRuntimeError

__all__ = ["StatefulProcessorApiClient", "StatefulProcessorHandleState"]

Expand Down Expand Up @@ -135,6 +135,7 @@ def _send_proto_message(self, message: bytes) -> None:

def _receive_proto_message(self) -> Tuple[int, str]:
import pyspark.sql.streaming.StateMessage_pb2 as stateMessage

length = read_int(self.sockfile)
bytes = self.sockfile.read(length)
message = stateMessage.StateResponse()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,13 @@
import os
import tempfile
from pyspark.sql.streaming import StatefulProcessor, StatefulProcessorHandle
from pyspark.errors import PySparkRuntimeError
from typing import Iterator

import unittest
from typing import cast

from pyspark import SparkConf
from pyspark.sql.functions import split, col
from pyspark.sql.functions import split
from pyspark.sql.types import (
StringType,
StructType,
Expand Down