Skip to content

Conversation

@bogao007
Copy link
Contributor

@bogao007 bogao007 commented Dec 11, 2024

What changes were proposed in this pull request?

Fix string schema for StatefulProcessorHandle, it was throwing an error before when passing the schema as String type because the utility method we used _parse_datatype_string requires a SparkContext which is not available on executors.

The way we support it is to create a new API ParseStringSchema from the client side (Python worker) to server side (JVM). Client passes a string schema to the server, we do the parsing on server side and then send the result back to the client.

Why are the changes needed?

This is to fix an issue/bug with the existing code.

Does this PR introduce any user-facing change?

No.

How was this patch tested?

Updated existing test cases to include string schemas.

Was this patch authored or co-authored using generative AI tooling?

No.

@bogao007
Copy link
Contributor Author

bogao007 commented Dec 11, 2024

@LuciferYang Do you know why the python codegen test failed here? I was using the ./dev/streaming-gen-protos.sh script that you created in this PR. Do we need to update the check-protos.py to accommodate the changes in StateMessage_pb2.py and StateMessage_pb2.pyi?

@LuciferYang
Copy link
Contributor

LuciferYang commented Dec 11, 2024

After executing dev/streaming-gen-protos.sh locally, I noticed the following changes in StateMessage_pb2.pyi:

diff --git a/python/pyspark/sql/streaming/proto/StateMessage_pb2.pyi b/python/pyspark/sql/streaming/proto/StateMessage_pb2.pyi
index 528f8dc4dcc..6d2be94f9d8 100644
--- a/python/pyspark/sql/streaming/proto/StateMessage_pb2.pyi
+++ b/python/pyspark/sql/streaming/proto/StateMessage_pb2.pyi
@@ -54,7 +54,7 @@ class _HandleState:
 class _HandleStateEnumTypeWrapper(
     google.protobuf.internal.enum_type_wrapper._EnumTypeWrapper[_HandleState.ValueType],
     builtins.type,
-):
+):  # noqa: F821
     DESCRIPTOR: google.protobuf.descriptor.EnumDescriptor
     CREATED: _HandleState.ValueType  # 0
     INITIALIZED: _HandleState.ValueType  # 1

Is there any local file changes that hasn't been committed?

"""
self.stateful_processor_api_client.get_value_state(state_name, schema, ttl_duration_ms)
return ValueState(ValueStateClient(self.stateful_processor_api_client), state_name, schema)
schema_struct = self.stateful_processor_api_client.get_value_state(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: It's not very straightforward to understand we request initialization for State and we get the schema as a result. (Arguably it's already confusing we get nothing from get_XXX, though it's following the current name convention.)

Why not just have a separate method? It's not very heavyweight even we have a separate request call(s) for schema, and it's only used for string schema.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But I'm open for voices, I know this reduces one round trip and there would be people who prefers performance over cleaner code.
cc. @anishshri-db It'd be good to hear about your input.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yea - probably better to have a separate API for this. As @HeartSaVioR mentioned, could we only do the conversion if string is passed ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense, will update, thanks!

Copy link
Contributor

@jingz-db jingz-db left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overall LGTM! Will take another look after moving the schema into a separate call.

schema: Union[StructType, str],
) -> None:
self._stateful_processor_api_client = stateful_processor_api_client
if isinstance(schema, str):
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Make schema part of the class constructor to avoid multiple API calls for parsing.

message.getMethodCase match {
case UtilsRequest.MethodCase.PARSESTRINGSCHEMA =>
val stringSchema = message.getParseStringSchema.getSchema
val schema = CatalystSqlParser.parseTableSchema(stringSchema)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shall we have an exception handling here for parse exception thrown from parseTableSchema when users pass in invalid schema string? It seems currently we will always send back return code 0 even if the parsing failed.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We already have exception handling logic on the server side, if parseTableSchema throws an error, it will be handled here

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's think of the outcome for users. If they make a mistake on the schema string, which error message they will get? This should be a part of error class.

I know we have TODOs for error class and I'm OK to file another one - we will need to address it before Spark 4.0, but it's not very close as of now.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the error we got when passing a schema like count inttttt

pyspark.errors.exceptions.base.PySparkRuntimeError: Error parsing string schema:
[UNSUPPORTED_DATATYPE] Unsupported data type "INTTTTT". SQLSTATE: 0A000 (line 1, pos 6)

== SQL ==
count inttttt
------^^^

This error is already handled in the bigger try-catch block here and we throw the error in stateful_processor_api_client._parse_string_schema(), we could just use the same TODO(SPARK-49233) there.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK let's deal with this in existing TODO.

Copy link
Contributor

@jingz-db jingz-db left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One small comment and otherwise LGTM! Thanks for making the change.

verify(arrowStreamWriter).finalizeCurrentArrowBatch()
}

test("utils request - parse string schema") {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shall we also have the test for negative case?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we use a bigger try-catch block under run() method, it's hard to test with. We could either have a specific try-catch block just for this call, or have a negative test on python side, which one do you prefer? cc @jingz-db

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm OK with having negative test on python side. I'm also OK with having follow up PR for this.

Copy link
Contributor

@HeartSaVioR HeartSaVioR left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good in overall - left a couple comments.

Copy link
Contributor

@HeartSaVioR HeartSaVioR left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1, let's address minor comments as follow-up.

@HeartSaVioR
Copy link
Contributor

Thanks! Merging to master.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants