Skip to content
Closed
Show file tree
Hide file tree
Changes from 48 commits
Commits
Show all changes
55 commits
Select commit Hold shift + click to select a range
b1175e4
[WIP] Poc.
sahnib Jan 22, 2024
0a98ed8
Introduce Protobuf.
sahnib Feb 6, 2024
8e2b193
Fixing things.
sahnib Feb 6, 2024
16e4c17
support timeMode for python state v2 API
bogao007 Jun 20, 2024
92ef716
Add protobuf for serde
sahnib Feb 13, 2024
c3eaf38
protobuf change
bogao007 Jun 20, 2024
609d94e
Initial commit
bogao007 Jun 27, 2024
a27f9d9
better error handling, support value state with different types
bogao007 Jun 28, 2024
684939b
addressed comments
bogao007 Jul 3, 2024
7f65fbd
fix
bogao007 Jul 3, 2024
c25d7da
Added support for unix domain socket
bogao007 Jul 11, 2024
9c8c616
removed unrelated log lines, addressed part of the comments
bogao007 Jul 17, 2024
c641192
fix
bogao007 Jul 17, 2024
8d3da4e
Addressed comments
bogao007 Jul 19, 2024
cc9bf95
removed unnecessary print
bogao007 Jul 19, 2024
f7df2dc
rename
bogao007 Jul 19, 2024
27cd169
fix
bogao007 Jul 19, 2024
3b5b3e5
removed duplicate proto file
bogao007 Jul 20, 2024
5d910d8
revert unrelated changes
bogao007 Jul 20, 2024
df859ab
fix
bogao007 Jul 20, 2024
654f2f6
Added unit tests for transformWithStateInPandas
bogao007 Jul 24, 2024
38832a6
Merge branch 'master' into state-v2-initial
bogao007 Jul 24, 2024
0585ac0
fix and rename
bogao007 Jul 24, 2024
0ee5029
update test
bogao007 Jul 24, 2024
6232c81
Added lisences
bogao007 Jul 25, 2024
41f8234
fixed format issues
bogao007 Jul 25, 2024
d57633f
fix
bogao007 Jul 25, 2024
df9ea9e
fix format
bogao007 Jul 25, 2024
68f7a7e
doc
bogao007 Jul 25, 2024
ca5216b
addressed comments
bogao007 Jul 26, 2024
c9e3a7c
structured log
bogao007 Jul 26, 2024
2320805
suppress auto generated proto file
bogao007 Jul 29, 2024
6e5de2e
fix linter
bogao007 Jul 29, 2024
200ec5e
fixed dependency issue
bogao007 Jul 29, 2024
dd3e46b
make protobuf as local dependency
bogao007 Jul 30, 2024
e8360d4
fix dependency issue
bogao007 Jul 30, 2024
82983af
fix
bogao007 Jul 30, 2024
49dbc16
fix lint
bogao007 Jul 30, 2024
d4e04ea
fix
bogao007 Jul 30, 2024
e108f60
updated fix
bogao007 Jul 30, 2024
bae26c2
reformat
bogao007 Jul 30, 2024
d96fa9e
addressed comments
bogao007 Jul 31, 2024
92531db
fix linter
bogao007 Jul 31, 2024
d507793
linter
bogao007 Jul 31, 2024
5dcb4c8
addressed comments
bogao007 Aug 2, 2024
37be02a
address comment
bogao007 Aug 2, 2024
f63687f
addressed comments
bogao007 Aug 9, 2024
263c087
Merge branch 'master' into state-v2-initial
bogao007 Aug 10, 2024
c7b0a4f
address comments
bogao007 Aug 12, 2024
c80b292
address comments
bogao007 Aug 12, 2024
81276f3
address comments
bogao007 Aug 14, 2024
5886b5c
fix lint
bogao007 Aug 14, 2024
23e54b4
fix lint
bogao007 Aug 14, 2024
2ba4fd0
address comments
bogao007 Aug 14, 2024
2a9c20b
fix test
bogao007 Aug 14, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions common/utils/src/main/resources/error/error-conditions.json
Original file line number Diff line number Diff line change
Expand Up @@ -3858,6 +3858,12 @@
],
"sqlState" : "42802"
},
"STATEFUL_PROCESSOR_UNKNOWN_TIME_MODE" : {
"message" : [
"Unknown time mode <timeMode>. Accepted timeMode modes are 'none', 'processingTime', 'eventTime'"
],
"sqlState" : "42802"
},
"STATE_STORE_CANNOT_CREATE_COLUMN_FAMILY_WITH_RESERVED_CHARS" : {
"message" : [
"Failed to create column family with unsupported starting character and name=<colFamilyName>."
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -749,6 +749,7 @@ private[spark] object LogKeys {
case object START_INDEX extends LogKey
case object START_TIME extends LogKey
case object STATEMENT_ID extends LogKey
case object STATE_NAME extends LogKey
case object STATE_STORE_ID extends LogKey
case object STATE_STORE_PROVIDER extends LogKey
case object STATE_STORE_VERSION extends LogKey
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ private[spark] object PythonEvalType {
val SQL_GROUPED_MAP_PANDAS_UDF_WITH_STATE = 208
val SQL_GROUPED_MAP_ARROW_UDF = 209
val SQL_COGROUPED_MAP_ARROW_UDF = 210
val SQL_TRANSFORM_WITH_STATE_PANDAS_UDF = 211

val SQL_TABLE_UDF = 300
val SQL_ARROW_TABLE_UDF = 301
Expand All @@ -82,6 +83,7 @@ private[spark] object PythonEvalType {
case SQL_COGROUPED_MAP_ARROW_UDF => "SQL_COGROUPED_MAP_ARROW_UDF"
case SQL_TABLE_UDF => "SQL_TABLE_UDF"
case SQL_ARROW_TABLE_UDF => "SQL_ARROW_TABLE_UDF"
case SQL_TRANSFORM_WITH_STATE_PANDAS_UDF => "SQL_TRANSFORM_WITH_STATE_PANDAS_UDF"
}
}

Expand Down
2 changes: 2 additions & 0 deletions dev/checkstyle-suppressions.xml
Original file line number Diff line number Diff line change
Expand Up @@ -68,4 +68,6 @@
files="src/main/java/org/apache/spark/network/util/LimitedInputStream.java" />
<suppress checks="Header"
files="src/test/java/org/apache/spark/util/collection/TestTimSort.java" />
<suppress checks=".*"
files="src/main/java/org/apache/spark/sql/execution/streaming/state/StateMessage.java"/>
</suppressions>
1 change: 1 addition & 0 deletions python/docs/source/reference/pyspark.sql/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -44,3 +44,4 @@ This page gives an overview of all public Spark SQL API.
variant_val
protobuf
datasource
stateful_processor
29 changes: 29 additions & 0 deletions python/docs/source/reference/pyspark.sql/stateful_processor.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
.. Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at

.. http://www.apache.org/licenses/LICENSE-2.0

.. Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.


==================
Stateful Processor
==================
.. currentmodule:: pyspark.sql.streaming

.. autosummary::
:toctree: api/

StatefulProcessor.init
StatefulProcessor.handleInputRows
StatefulProcessor.close
1 change: 1 addition & 0 deletions python/pyspark/sql/pandas/_typing/__init__.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ ArrowMapIterUDFType = Literal[207]
PandasGroupedMapUDFWithStateType = Literal[208]
ArrowGroupedMapUDFType = Literal[209]
ArrowCogroupedMapUDFType = Literal[210]
PandasGroupedMapUDFTransformWithStateType = Literal[211]

class PandasVariadicScalarToScalarFunction(Protocol):
def __call__(self, *_: DataFrameOrSeriesLike_) -> DataFrameOrSeriesLike_: ...
Expand Down
2 changes: 2 additions & 0 deletions python/pyspark/sql/pandas/functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -413,6 +413,7 @@ def calculate(iterator: Iterator[pd.Series]) -> Iterator[pd.Series]:
PythonEvalType.SQL_MAP_ARROW_ITER_UDF,
PythonEvalType.SQL_COGROUPED_MAP_PANDAS_UDF,
PythonEvalType.SQL_GROUPED_MAP_PANDAS_UDF_WITH_STATE,
PythonEvalType.SQL_TRANSFORM_WITH_STATE_PANDAS_UDF,
PythonEvalType.SQL_GROUPED_MAP_ARROW_UDF,
PythonEvalType.SQL_COGROUPED_MAP_ARROW_UDF,
None,
Expand Down Expand Up @@ -453,6 +454,7 @@ def _validate_pandas_udf(f, evalType) -> int:
PythonEvalType.SQL_MAP_ARROW_ITER_UDF,
PythonEvalType.SQL_COGROUPED_MAP_PANDAS_UDF,
PythonEvalType.SQL_GROUPED_MAP_PANDAS_UDF_WITH_STATE,
PythonEvalType.SQL_TRANSFORM_WITH_STATE_PANDAS_UDF,
PythonEvalType.SQL_GROUPED_MAP_ARROW_UDF,
PythonEvalType.SQL_COGROUPED_MAP_ARROW_UDF,
PythonEvalType.SQL_ARROW_BATCHED_UDF,
Expand Down
154 changes: 153 additions & 1 deletion python/pyspark/sql/pandas/group_ops.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,19 @@
# limitations under the License.
#
import sys
from typing import List, Union, TYPE_CHECKING, cast
from typing import Any, Iterator, List, Union, TYPE_CHECKING, cast
import warnings

from pyspark.errors import PySparkTypeError
from pyspark.util import PythonEvalType
from pyspark.sql.column import Column
from pyspark.sql.dataframe import DataFrame
from pyspark.sql.streaming.state import GroupStateTimeout
from pyspark.sql.streaming.stateful_processor_api_client import (
StatefulProcessorApiClient,
StatefulProcessorHandleState,
)
from pyspark.sql.streaming.stateful_processor import StatefulProcessor, StatefulProcessorHandle
from pyspark.sql.types import StructType, _parse_datatype_string

if TYPE_CHECKING:
Expand All @@ -33,6 +38,7 @@
PandasCogroupedMapFunction,
ArrowGroupedMapFunction,
ArrowCogroupedMapFunction,
DataFrameLike as PandasDataFrameLike,
)
from pyspark.sql.group import GroupedData

Expand Down Expand Up @@ -358,6 +364,152 @@ def applyInPandasWithState(
)
return DataFrame(jdf, self.session)

def transformWithStateInPandas(
self,
statefulProcessor: StatefulProcessor,
outputStructType: Union[StructType, str],
outputMode: str,
timeMode: str,
) -> DataFrame:
"""
Invokes methods defined in the stateful processor used in arbitrary state API v2. It
requires protobuf as a dependency to transmit state messages/data. We allow the user to act
on per-group set of input rows along with keyed state and the user can choose to
output/return 0 or more rows.

For a streaming dataframe, we will repeatedly invoke the interface methods for new rows
in each trigger and the user's state/state variables will be stored persistently across
invocations.

The `statefulProcessor` should be a Python class that implements the interface defined in
pyspark.sql.streaming.stateful_processor.StatefulProcessor.

The `outputStructType` should be a :class:`StructType` describing the schema of all
elements in the returned value, `pandas.DataFrame`. The column labels of all elements in
returned `pandas.DataFrame` must either match the field names in the defined schema if
specified as strings, or match the field data types by position if not strings,
e.g. integer indices.

The size of each `pandas.DataFrame` in both the input and output can be arbitrary. The
number of `pandas.DataFrame` in both the input and output can also be arbitrary.

.. versionadded:: 4.0.0

Parameters
----------
statefulProcessor : :class:`pyspark.sql.streaming.stateful_processor.StatefulProcessor`
Instance of StatefulProcessor whose functions will be invoked by the operator.
outputStructType : :class:`pyspark.sql.types.DataType` or str
The type of the output records. The value can be either a
:class:`pyspark.sql.types.DataType` object or a DDL-formatted type string.
outputMode : str
The output mode of the stateful processor.
timeMode : str
The time mode semantics of the stateful processor for timers and TTL.

Examples
--------
>>> from typing import Iterator
...
>>> import pandas as pd # doctest: +SKIP
...
>>> from pyspark.sql import Row
>>> from pyspark.sql.functions import col, split
>>> from pyspark.sql.streaming import StatefulProcessor, StatefulProcessorHandle
>>> from pyspark.sql.types import IntegerType, LongType, StringType, StructField, StructType
>>> spark.conf.set("spark.sql.streaming.stateStore.providerClass",
... "org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreProvider")
... # Below is a simple example of a stateful processor that counts the number of violations
... # for a set of temperature sensors. A violation is defined when the temperature is above
... # 100.
... # The input data is a DataFrame with the following schema:
... # `id: string, temperature: long`.
... # The output schema and state schema are defined as below.
>>> output_schema = StructType([
... StructField("id", StringType(), True),
... StructField("count", IntegerType(), True)
... ])
>>> state_schema = StructType([
... StructField("value", IntegerType(), True)
... ])
>>> class SimpleStatefulProcessor(StatefulProcessor):
... def init(self, handle: StatefulProcessorHandle):
... self.num_violations_state = handle.getValueState("numViolations", state_schema)
...
... def handleInputRows(self, key, rows):
... new_violations = 0
... count = 0
... exists = self.num_violations_state.exists()
... if exists:
... existing_violations_pdf = self.num_violations_state.get()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the expectation of the type of this state "value"? From the variable name pdf and also the way we get the number, I suspect this to be a pandas DataFrame, while the right type should be Row.

... existing_violations = existing_violations_pdf.get("value")[0]
... else:
... existing_violations = 0
... for pdf in rows:
... pdf_count = pdf.count()
... count += pdf_count.get('temperature')
... violations_pdf = pdf.loc[pdf['temperature'] > 100]
... new_violations += violations_pdf.count().get('temperature')
... updated_violations = new_violations + existing_violations
... self.num_violations_state.update((updated_violations,))
... yield pd.DataFrame({'id': key, 'count': count})
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess the explanation is to produce the number of violations instead of the number of inputs. This doesn't follow the explanation.

...
... def close(self) -> None:
... pass
...
>>> df.groupBy("value").transformWithStateInPandas(statefulProcessor =
... SimpleStatefulProcessor(), outputStructType=output_schema, outputMode="Update",
... timeMode="None") # doctest: +SKIP

Notes
-----
This function requires a full shuffle.

This API is experimental.
"""

from pyspark.sql import GroupedData
from pyspark.sql.functions import pandas_udf

assert isinstance(self, GroupedData)

def transformWithStateUDF(
statefulProcessorApiClient: StatefulProcessorApiClient,
key: Any,
inputRows: Iterator["PandasDataFrameLike"],
) -> Iterator["PandasDataFrameLike"]:
handle = StatefulProcessorHandle(statefulProcessorApiClient)

if statefulProcessorApiClient.handle_state == StatefulProcessorHandleState.CREATED:
statefulProcessor.init(handle)
statefulProcessorApiClient.set_handle_state(
StatefulProcessorHandleState.INITIALIZED
)

statefulProcessorApiClient.set_implicit_key(key)
result = statefulProcessor.handleInputRows(key, inputRows)

return result

if isinstance(outputStructType, str):
outputStructType = cast(StructType, _parse_datatype_string(outputStructType))

udf = pandas_udf(
transformWithStateUDF, # type: ignore
returnType=outputStructType,
functionType=PythonEvalType.SQL_TRANSFORM_WITH_STATE_PANDAS_UDF,
)
df = self._df
udf_column = udf(*[df[col] for col in df.columns])

jdf = self._jgd.transformWithStateInPandas(
udf_column._jc.expr(),
self.session._jsparkSession.parseDataType(outputStructType.json()),
outputMode,
timeMode,
)
return DataFrame(jdf, self.session)

def applyInArrow(
self, func: "ArrowGroupedMapFunction", schema: Union[StructType, str]
) -> "DataFrame":
Expand Down
75 changes: 74 additions & 1 deletion python/pyspark/sql/pandas/serializers.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,16 @@
Serializers for PyArrow and pandas conversions. See `pyspark.serializers` for more details.
"""

from itertools import groupby
from pyspark.errors import PySparkRuntimeError, PySparkTypeError, PySparkValueError
from pyspark.loose_version import LooseVersion
from pyspark.serializers import Serializer, read_int, write_int, UTF8Deserializer, CPickleSerializer
from pyspark.serializers import (
Serializer,
read_int,
write_int,
UTF8Deserializer,
CPickleSerializer,
)
from pyspark.sql.pandas.types import (
from_arrow_type,
to_arrow_type,
Expand Down Expand Up @@ -1116,3 +1123,69 @@ def init_stream_yield_batches(batches):
batches_to_write = init_stream_yield_batches(serialize_batches())

return ArrowStreamSerializer.dump_stream(self, batches_to_write, stream)


class TransformWithStateInPandasSerializer(ArrowStreamPandasUDFSerializer):
"""
Serializer used by Python worker to evaluate UDF for transformWithStateInPandasSerializer.

Parameters
----------
timezone : str
A timezone to respect when handling timestamp values
safecheck : bool
If True, conversion from Arrow to Pandas checks for overflow/truncation
assign_cols_by_name : bool
If True, then Pandas DataFrames will get columns by name
arrow_max_records_per_batch : int
Limit of the number of records that can be written to a single ArrowRecordBatch in memory.
"""

def __init__(self, timezone, safecheck, assign_cols_by_name, arrow_max_records_per_batch):
super(TransformWithStateInPandasSerializer, self).__init__(
timezone, safecheck, assign_cols_by_name
)
self.arrow_max_records_per_batch = arrow_max_records_per_batch
self.key_offsets = None

def load_stream(self, stream):
"""
Read ArrowRecordBatches from stream, deserialize them to populate a list of data chunk, and
convert the data into a list of pandas.Series.

Please refer the doc of inner function `generate_data_batches` for more details how
this function works in overall.
"""
import pyarrow as pa

def generate_data_batches(batches):
"""
Deserialize ArrowRecordBatches and return a generator of pandas.Series list.

The deserialization logic assumes that Arrow RecordBatches contain the data with the
ordering that data chunks for same grouping key will appear sequentially.

This function must avoid materializing multiple Arrow RecordBatches into memory at the
same time. And data chunks from the same grouping key should appear sequentially.
"""
for batch in batches:
data_pandas = [
self.arrow_to_pandas(c) for c in pa.Table.from_batches([batch]).itercolumns()
]
key_series = [data_pandas[o] for o in self.key_offsets]
batch_key = tuple(s[0] for s in key_series)
yield (batch_key, data_pandas)

_batches = super(ArrowStreamPandasSerializer, self).load_stream(stream)
data_batches = generate_data_batches(_batches)

for k, g in groupby(data_batches, key=lambda x: x[0]):
yield (k, g)

def dump_stream(self, iterator, stream):
"""
Read through an iterator of (iterator of pandas DataFram), serialize them to Arrow
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: DataFrame

RecordBatches, and write batches to stream.
"""
result = [(b, t) for x in iterator for y, t in x for b in y]
super().dump_stream(result, stream)
Loading