Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions python/pyspark/sql/pandas/functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -415,8 +415,8 @@ def calculate(iterator: Iterator[pd.Series]) -> Iterator[pd.Series]:
PythonEvalType.SQL_GROUPED_MAP_PANDAS_UDF_WITH_STATE,
PythonEvalType.SQL_TRANSFORM_WITH_STATE_PANDAS_UDF,
PythonEvalType.SQL_TRANSFORM_WITH_STATE_PANDAS_INIT_STATE_UDF,
PythonEvalType.SQL_TRANSFORM_WITH_STATE_UDF,
PythonEvalType.SQL_TRANSFORM_WITH_STATE_INIT_STATE_UDF,
PythonEvalType.SQL_TRANSFORM_WITH_STATE_PYTHON_ROW_UDF,
PythonEvalType.SQL_TRANSFORM_WITH_STATE_PYTHON_ROW_INIT_STATE_UDF,
PythonEvalType.SQL_GROUPED_MAP_ARROW_UDF,
PythonEvalType.SQL_COGROUPED_MAP_ARROW_UDF,
None,
Expand Down Expand Up @@ -459,8 +459,8 @@ def _validate_pandas_udf(f, evalType) -> int:
PythonEvalType.SQL_GROUPED_MAP_PANDAS_UDF_WITH_STATE,
PythonEvalType.SQL_TRANSFORM_WITH_STATE_PANDAS_UDF,
PythonEvalType.SQL_TRANSFORM_WITH_STATE_PANDAS_INIT_STATE_UDF,
PythonEvalType.SQL_TRANSFORM_WITH_STATE_UDF,
PythonEvalType.SQL_TRANSFORM_WITH_STATE_INIT_STATE_UDF,
PythonEvalType.SQL_TRANSFORM_WITH_STATE_PYTHON_ROW_UDF,
PythonEvalType.SQL_TRANSFORM_WITH_STATE_PYTHON_ROW_INIT_STATE_UDF,
PythonEvalType.SQL_GROUPED_MAP_ARROW_UDF,
PythonEvalType.SQL_COGROUPED_MAP_ARROW_UDF,
PythonEvalType.SQL_ARROW_BATCHED_UDF,
Expand Down
4 changes: 2 additions & 2 deletions python/pyspark/sql/pandas/group_ops.py
Original file line number Diff line number Diff line change
Expand Up @@ -654,10 +654,10 @@ def __transformWithState(
elif usePandas and initialState is not None:
functionType = PythonEvalType.SQL_TRANSFORM_WITH_STATE_PANDAS_INIT_STATE_UDF
elif not usePandas and initialState is None:
functionType = PythonEvalType.SQL_TRANSFORM_WITH_STATE_UDF
functionType = PythonEvalType.SQL_TRANSFORM_WITH_STATE_PYTHON_ROW_UDF
else:
# not usePandas and initialState is not None
functionType = PythonEvalType.SQL_TRANSFORM_WITH_STATE_INIT_STATE_UDF
functionType = PythonEvalType.SQL_TRANSFORM_WITH_STATE_PYTHON_ROW_INIT_STATE_UDF

if initialState is None:
initial_state_java_obj = None
Expand Down
4 changes: 2 additions & 2 deletions python/pyspark/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -641,8 +641,8 @@ class PythonEvalType:
SQL_TRANSFORM_WITH_STATE_PANDAS_INIT_STATE_UDF: "PandasGroupedMapUDFTransformWithStateInitStateType" = ( # noqa: E501
212
)
SQL_TRANSFORM_WITH_STATE_UDF: "GroupedMapUDFTransformWithStateType" = 213
SQL_TRANSFORM_WITH_STATE_INIT_STATE_UDF: "GroupedMapUDFTransformWithStateInitStateType" = ( # noqa: E501
SQL_TRANSFORM_WITH_STATE_PYTHON_ROW_UDF: "GroupedMapUDFTransformWithStateType" = 213
SQL_TRANSFORM_WITH_STATE_PYTHON_ROW_INIT_STATE_UDF: "GroupedMapUDFTransformWithStateInitStateType" = ( # noqa: E501
214
)
SQL_TABLE_UDF: "SQLTableUDFType" = 300
Expand Down
20 changes: 10 additions & 10 deletions python/pyspark/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -969,9 +969,9 @@ def read_single_udf(pickleSer, infile, eval_type, runner_conf, udf_index, profil
return args_offsets, wrap_grouped_transform_with_state_pandas_init_state_udf(
func, return_type, runner_conf
)
elif eval_type == PythonEvalType.SQL_TRANSFORM_WITH_STATE_UDF:
elif eval_type == PythonEvalType.SQL_TRANSFORM_WITH_STATE_PYTHON_ROW_UDF:
return args_offsets, wrap_grouped_transform_with_state_udf(func, return_type, runner_conf)
elif eval_type == PythonEvalType.SQL_TRANSFORM_WITH_STATE_INIT_STATE_UDF:
elif eval_type == PythonEvalType.SQL_TRANSFORM_WITH_STATE_PYTHON_ROW_INIT_STATE_UDF:
return args_offsets, wrap_grouped_transform_with_state_init_state_udf(
func, return_type, runner_conf
)
Expand Down Expand Up @@ -1572,8 +1572,8 @@ def read_udfs(pickleSer, infile, eval_type):
PythonEvalType.SQL_COGROUPED_MAP_ARROW_UDF,
PythonEvalType.SQL_TRANSFORM_WITH_STATE_PANDAS_UDF,
PythonEvalType.SQL_TRANSFORM_WITH_STATE_PANDAS_INIT_STATE_UDF,
PythonEvalType.SQL_TRANSFORM_WITH_STATE_UDF,
PythonEvalType.SQL_TRANSFORM_WITH_STATE_INIT_STATE_UDF,
PythonEvalType.SQL_TRANSFORM_WITH_STATE_PYTHON_ROW_UDF,
PythonEvalType.SQL_TRANSFORM_WITH_STATE_PYTHON_ROW_INIT_STATE_UDF,
):
# Load conf used for pandas_udf evaluation
num_conf = read_int(infile)
Expand All @@ -1588,8 +1588,8 @@ def read_udfs(pickleSer, infile, eval_type):
elif (
eval_type == PythonEvalType.SQL_TRANSFORM_WITH_STATE_PANDAS_UDF
or eval_type == PythonEvalType.SQL_TRANSFORM_WITH_STATE_PANDAS_INIT_STATE_UDF
or eval_type == PythonEvalType.SQL_TRANSFORM_WITH_STATE_UDF
or eval_type == PythonEvalType.SQL_TRANSFORM_WITH_STATE_INIT_STATE_UDF
or eval_type == PythonEvalType.SQL_TRANSFORM_WITH_STATE_PYTHON_ROW_UDF
or eval_type == PythonEvalType.SQL_TRANSFORM_WITH_STATE_PYTHON_ROW_INIT_STATE_UDF
):
state_server_port = read_int(infile)
if state_server_port == -1:
Expand Down Expand Up @@ -1641,14 +1641,14 @@ def read_udfs(pickleSer, infile, eval_type):
ser = TransformWithStateInPandasInitStateSerializer(
timezone, safecheck, _assign_cols_by_name, arrow_max_records_per_batch
)
elif eval_type == PythonEvalType.SQL_TRANSFORM_WITH_STATE_UDF:
elif eval_type == PythonEvalType.SQL_TRANSFORM_WITH_STATE_PYTHON_ROW_UDF:
arrow_max_records_per_batch = runner_conf.get(
"spark.sql.execution.arrow.maxRecordsPerBatch", 10000
)
arrow_max_records_per_batch = int(arrow_max_records_per_batch)

ser = TransformWithStateInPySparkRowSerializer(arrow_max_records_per_batch)
elif eval_type == PythonEvalType.SQL_TRANSFORM_WITH_STATE_INIT_STATE_UDF:
elif eval_type == PythonEvalType.SQL_TRANSFORM_WITH_STATE_PYTHON_ROW_INIT_STATE_UDF:
arrow_max_records_per_batch = runner_conf.get(
"spark.sql.execution.arrow.maxRecordsPerBatch", 10000
)
Expand Down Expand Up @@ -1889,7 +1889,7 @@ def values_gen():
# mode == PROCESS_TIMER or mode == COMPLETE
return f(stateful_processor_api_client, mode, None, iter([]))

elif eval_type == PythonEvalType.SQL_TRANSFORM_WITH_STATE_UDF:
elif eval_type == PythonEvalType.SQL_TRANSFORM_WITH_STATE_PYTHON_ROW_UDF:
# We assume there is only one UDF here because grouped map doesn't
# support combining multiple UDFs.
assert num_udfs == 1
Expand All @@ -1916,7 +1916,7 @@ def mapper(a):
# mode == PROCESS_TIMER or mode == COMPLETE
return f(stateful_processor_api_client, mode, None, iter([]))

elif eval_type == PythonEvalType.SQL_TRANSFORM_WITH_STATE_INIT_STATE_UDF:
elif eval_type == PythonEvalType.SQL_TRANSFORM_WITH_STATE_PYTHON_ROW_INIT_STATE_UDF:
# We assume there is only one UDF here because grouped map doesn't
# support combining multiple UDFs.
assert num_udfs == 1
Expand Down