-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-44877][CONNECT][PYTHON] Support python protobuf functions for Spark Connect #42563
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
| <version>${project.version}</version> | ||
| <scope>provided</scope> | ||
| </dependency> | ||
| <dependency> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is needed to avoid java.lang.NoClassDefFoundError: org/apache/spark/sql/protobuf/CatalystDataToProtobuf. Similar issue was also mentioned in Avro support PR.
| ], | ||
| ) | ||
|
|
||
| connect = Module( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Did you swap the order of protobuf and connect? The diff looks confusing :).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yep, I swapped the order to let protobuf be able to use in dependencies=[hive, avro, protobuf]
| protobuf_jar = search_jar("connector/protobuf", "spark-protobuf-assembly-", "spark-protobuf") | ||
| if protobuf_jar is None: | ||
| print( | ||
| "Skipping all Protobuf Python tests as the optional Protobuf project was " | ||
| "not compiled into a JAR. To run these tests, " | ||
| "you need to build Spark with 'build/sbt package' or " | ||
| "'build/mvn package' before running this test." | ||
| ) | ||
| sys.exit(0) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should not fail like this right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This part is just copied from the original protobuf functions. Should we do it differently here?
| "from_protobuf", _to_col(data), lit(messageName), lit(binary_proto) | ||
| ) | ||
| else: | ||
| return _invoke_function( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So many calls to _invoke_function(). It will be much simpler if it allowed 'None' options and ignored it. @bogao007 could you include a comment about it?
cc: @LuciferYang, @HyukjinKwon
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
_invoke_function() doesn't seem to support None input. I will include a comment for it.
|
@HyukjinKwon could you help take a look at this PR? This needs to go to 3.5 branch as well, thanks! |
rangadi
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM.
…Spark Connect
### What changes were proposed in this pull request?
Support python protobuf functions for Spark Connect
### Why are the changes needed?
Support python protobuf functions for Spark Connect
### Does this PR introduce _any_ user-facing change?
Yes
### How was this patch tested?
added doctest and did manual test
```
bo.gaoPF2WXGJ3KT spark % bin/pyspark --remote "local[*]" --jars connector/protobuf/target/scala-2.12/spark-protobuf_2.12-4.0.0-SNAPSHOT.jar
Python 3.9.6 (default, May 7 2023, 23:32:44)
[Clang 14.0.3 (clang-1403.0.22.14.1)] on darwin
Type "help", "copyright", "credits" or "license" for more information.
23/08/18 10:47:19 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
/Users/bo.gao/workplace/spark/python/pyspark/pandas/__init__.py:50: UserWarning: 'PYARROW_IGNORE_TIMEZONE' environment variable was not set. It is required to set this environment variable to '1' in both driver and executor sides if you use pyarrow>=2.0.0. pandas-on-Spark will set it for you but it does not work if there is a Spark context already launched.
warnings.warn(
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/__ / .__/\_,_/_/ /_/\_\ version 4.0.0.dev0
/_/
Using Python version 3.9.6 (default, May 7 2023 23:32:44)
Client connected to the Spark Connect server at localhost
SparkSession available as 'spark'.
>>> from pyspark.sql.protobuf.functions import from_protobuf, to_protobuf
>>> import tempfile
>>> data = [([(2, "Alice", 13093020)])]
>>> ddl_schema = "value struct<age: INTEGER, name: STRING, score: LONG>"
>>> df = spark.createDataFrame(data, ddl_schema)
>>> desc_hex = str('0ACE010A41636F6E6E6563746F722F70726F746F6275662F7372632F746573742F726'
... '5736F75726365732F70726F746F6275662F7079737061726B5F746573742E70726F746F121D6F72672E61'
... '70616368652E737061726B2E73716C2E70726F746F627566224B0A0D53696D706C654D657373616765121'
... '00A03616765180120012805520361676512120A046E616D6518022001280952046E616D6512140A057363'
... '6F7265180320012803520573636F72654215421353696D706C654D65737361676550726F746F736206707'
... '26F746F33')
>>> with tempfile.TemporaryDirectory() as tmp_dir:
... desc_file_path = "%s/pyspark_test.desc" % tmp_dir
... with open(desc_file_path, "wb") as f:
... _ = f.write(bytearray.fromhex(desc_hex))
... f.flush()
... message_name = 'SimpleMessage'
... proto_df = df.select(
... to_protobuf(df.value, message_name, desc_file_path).alias("value"))
... proto_df.show(truncate=False)
... proto_df_1 = proto_df.select( # With file name for descriptor
... from_protobuf(proto_df.value, message_name, desc_file_path).alias("value"))
... proto_df_1.show(truncate=False)
... proto_df_2 = proto_df.select( # With binary for descriptor
... from_protobuf(proto_df.value, message_name,
... binaryDescriptorSet = bytearray.fromhex(desc_hex))
... .alias("value"))
... proto_df_2.show(truncate=False)
...
+-------------------------------------------+
|value |
+-------------------------------------------+
|[08 02 12 05 41 6C 69 63 65 18 9C 91 9F 06]|
+-------------------------------------------+
+--------------------+
|value |
+--------------------+
|{2, Alice, 13093020}|
+--------------------+
+--------------------+
|value |
+--------------------+
|{2, Alice, 13093020}|
+--------------------+
```
```
>>> data = [([(1668035962, 2020)])]
>>> ddl_schema = "value struct<seconds: LONG, nanos: INT>"
>>> df = spark.createDataFrame(data, ddl_schema)
>>> message_class_name = "org.sparkproject.spark_protobuf.protobuf.Timestamp"
>>> to_proto_df = df.select(to_protobuf(df.value, message_class_name).alias("value"))
>>> from_proto_df = to_proto_df.select(
... from_protobuf(to_proto_df.value, message_class_name).alias("value"))
>>> from_proto_df.show(truncate=False)
+------------------+
|value |
+------------------+
|{1668035962, 2020}|
+------------------+
```
```
>>> import tempfile
>>> data = [([(2, "Alice", 13093020)])]
>>> ddl_schema = "value struct<age: INTEGER, name: STRING, score: LONG>"
>>> df = spark.createDataFrame(data, ddl_schema)
>>> desc_hex = str('0ACE010A41636F6E6E6563746F722F70726F746F6275662F7372632F746573742F726'
... '5736F75726365732F70726F746F6275662F7079737061726B5F746573742E70726F746F121D6F72672E61'
... '70616368652E737061726B2E73716C2E70726F746F627566224B0A0D53696D706C654D657373616765121'
... '00A03616765180120012805520361676512120A046E616D6518022001280952046E616D6512140A057363'
... '6F7265180320012803520573636F72654215421353696D706C654D65737361676550726F746F736206707'
... '26F746F33')
>>> with tempfile.TemporaryDirectory() as tmp_dir:
... desc_file_path = "%s/pyspark_test.desc" % tmp_dir
... with open(desc_file_path, "wb") as f:
... _ = f.write(bytearray.fromhex(desc_hex))
... f.flush()
... message_name = 'SimpleMessage'
... proto_df = df.select( # With file name for descriptor
... to_protobuf(df.value, message_name, desc_file_path).alias("suite"))
... proto_df.show(truncate=False)
... proto_df_2 = df.select( # With binary for descriptor
... to_protobuf(df.value, message_name,
... binaryDescriptorSet=bytearray.fromhex(desc_hex))
... .alias("suite"))
... proto_df_2.show(truncate=False)
...
+-------------------------------------------+
|suite |
+-------------------------------------------+
|[08 02 12 05 41 6C 69 63 65 18 9C 91 9F 06]|
+-------------------------------------------+
+-------------------------------------------+
|suite |
+-------------------------------------------+
|[08 02 12 05 41 6C 69 63 65 18 9C 91 9F 06]|
+-------------------------------------------+
```
```
>>> data = [([(1668035962, 2020)])]
>>> ddl_schema = "value struct<seconds: LONG, nanos: INT>"
>>> df = spark.createDataFrame(data, ddl_schema)
>>> message_class_name = "org.sparkproject.spark_protobuf.protobuf.Timestamp"
>>> proto_df = df.select(to_protobuf(df.value, message_class_name).alias("suite"))
>>> proto_df.show(truncate=False)
+----------------------------+
|suite |
+----------------------------+
|[08 FA EA B0 9B 06 10 E4 0F]|
+----------------------------+
```
Closes #42563 from bogao007/python-connect-protobuf.
Authored-by: bogao007 <[email protected]>
Signed-off-by: Ruifeng Zheng <[email protected]>
(cherry picked from commit 5151b5b)
Signed-off-by: Ruifeng Zheng <[email protected]>
|
merged to master and branch-3.5 |
…Spark Connect
### What changes were proposed in this pull request?
Support python protobuf functions for Spark Connect
### Why are the changes needed?
Support python protobuf functions for Spark Connect
### Does this PR introduce _any_ user-facing change?
Yes
### How was this patch tested?
added doctest and did manual test
```
bo.gaoPF2WXGJ3KT spark % bin/pyspark --remote "local[*]" --jars connector/protobuf/target/scala-2.12/spark-protobuf_2.12-4.0.0-SNAPSHOT.jar
Python 3.9.6 (default, May 7 2023, 23:32:44)
[Clang 14.0.3 (clang-1403.0.22.14.1)] on darwin
Type "help", "copyright", "credits" or "license" for more information.
23/08/18 10:47:19 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
/Users/bo.gao/workplace/spark/python/pyspark/pandas/__init__.py:50: UserWarning: 'PYARROW_IGNORE_TIMEZONE' environment variable was not set. It is required to set this environment variable to '1' in both driver and executor sides if you use pyarrow>=2.0.0. pandas-on-Spark will set it for you but it does not work if there is a Spark context already launched.
warnings.warn(
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/__ / .__/\_,_/_/ /_/\_\ version 4.0.0.dev0
/_/
Using Python version 3.9.6 (default, May 7 2023 23:32:44)
Client connected to the Spark Connect server at localhost
SparkSession available as 'spark'.
>>> from pyspark.sql.protobuf.functions import from_protobuf, to_protobuf
>>> import tempfile
>>> data = [([(2, "Alice", 13093020)])]
>>> ddl_schema = "value struct<age: INTEGER, name: STRING, score: LONG>"
>>> df = spark.createDataFrame(data, ddl_schema)
>>> desc_hex = str('0ACE010A41636F6E6E6563746F722F70726F746F6275662F7372632F746573742F726'
... '5736F75726365732F70726F746F6275662F7079737061726B5F746573742E70726F746F121D6F72672E61'
... '70616368652E737061726B2E73716C2E70726F746F627566224B0A0D53696D706C654D657373616765121'
... '00A03616765180120012805520361676512120A046E616D6518022001280952046E616D6512140A057363'
... '6F7265180320012803520573636F72654215421353696D706C654D65737361676550726F746F736206707'
... '26F746F33')
>>> with tempfile.TemporaryDirectory() as tmp_dir:
... desc_file_path = "%s/pyspark_test.desc" % tmp_dir
... with open(desc_file_path, "wb") as f:
... _ = f.write(bytearray.fromhex(desc_hex))
... f.flush()
... message_name = 'SimpleMessage'
... proto_df = df.select(
... to_protobuf(df.value, message_name, desc_file_path).alias("value"))
... proto_df.show(truncate=False)
... proto_df_1 = proto_df.select( # With file name for descriptor
... from_protobuf(proto_df.value, message_name, desc_file_path).alias("value"))
... proto_df_1.show(truncate=False)
... proto_df_2 = proto_df.select( # With binary for descriptor
... from_protobuf(proto_df.value, message_name,
... binaryDescriptorSet = bytearray.fromhex(desc_hex))
... .alias("value"))
... proto_df_2.show(truncate=False)
...
+-------------------------------------------+
|value |
+-------------------------------------------+
|[08 02 12 05 41 6C 69 63 65 18 9C 91 9F 06]|
+-------------------------------------------+
+--------------------+
|value |
+--------------------+
|{2, Alice, 13093020}|
+--------------------+
+--------------------+
|value |
+--------------------+
|{2, Alice, 13093020}|
+--------------------+
```
```
>>> data = [([(1668035962, 2020)])]
>>> ddl_schema = "value struct<seconds: LONG, nanos: INT>"
>>> df = spark.createDataFrame(data, ddl_schema)
>>> message_class_name = "org.sparkproject.spark_protobuf.protobuf.Timestamp"
>>> to_proto_df = df.select(to_protobuf(df.value, message_class_name).alias("value"))
>>> from_proto_df = to_proto_df.select(
... from_protobuf(to_proto_df.value, message_class_name).alias("value"))
>>> from_proto_df.show(truncate=False)
+------------------+
|value |
+------------------+
|{1668035962, 2020}|
+------------------+
```
```
>>> import tempfile
>>> data = [([(2, "Alice", 13093020)])]
>>> ddl_schema = "value struct<age: INTEGER, name: STRING, score: LONG>"
>>> df = spark.createDataFrame(data, ddl_schema)
>>> desc_hex = str('0ACE010A41636F6E6E6563746F722F70726F746F6275662F7372632F746573742F726'
... '5736F75726365732F70726F746F6275662F7079737061726B5F746573742E70726F746F121D6F72672E61'
... '70616368652E737061726B2E73716C2E70726F746F627566224B0A0D53696D706C654D657373616765121'
... '00A03616765180120012805520361676512120A046E616D6518022001280952046E616D6512140A057363'
... '6F7265180320012803520573636F72654215421353696D706C654D65737361676550726F746F736206707'
... '26F746F33')
>>> with tempfile.TemporaryDirectory() as tmp_dir:
... desc_file_path = "%s/pyspark_test.desc" % tmp_dir
... with open(desc_file_path, "wb") as f:
... _ = f.write(bytearray.fromhex(desc_hex))
... f.flush()
... message_name = 'SimpleMessage'
... proto_df = df.select( # With file name for descriptor
... to_protobuf(df.value, message_name, desc_file_path).alias("suite"))
... proto_df.show(truncate=False)
... proto_df_2 = df.select( # With binary for descriptor
... to_protobuf(df.value, message_name,
... binaryDescriptorSet=bytearray.fromhex(desc_hex))
... .alias("suite"))
... proto_df_2.show(truncate=False)
...
+-------------------------------------------+
|suite |
+-------------------------------------------+
|[08 02 12 05 41 6C 69 63 65 18 9C 91 9F 06]|
+-------------------------------------------+
+-------------------------------------------+
|suite |
+-------------------------------------------+
|[08 02 12 05 41 6C 69 63 65 18 9C 91 9F 06]|
+-------------------------------------------+
```
```
>>> data = [([(1668035962, 2020)])]
>>> ddl_schema = "value struct<seconds: LONG, nanos: INT>"
>>> df = spark.createDataFrame(data, ddl_schema)
>>> message_class_name = "org.sparkproject.spark_protobuf.protobuf.Timestamp"
>>> proto_df = df.select(to_protobuf(df.value, message_class_name).alias("suite"))
>>> proto_df.show(truncate=False)
+----------------------------+
|suite |
+----------------------------+
|[08 FA EA B0 9B 06 10 E4 0F]|
+----------------------------+
```
Closes apache#42563 from bogao007/python-connect-protobuf.
Authored-by: bogao007 <[email protected]>
Signed-off-by: Ruifeng Zheng <[email protected]>
…setup.py ### What changes were proposed in this pull request? This PR is a followup of #42563 (but using new JIRA as it's already released), which adds `pyspark.sql.connect.protobuf` into `setup.py`. ### Why are the changes needed? So PyPI packaged PySpark can support protobuf function with Spark Connect on. ### Does this PR introduce _any_ user-facing change? Yes. The new feature is now available with Spark Connect on if users install Spark Connect by `pip`. ### How was this patch tested? Being tested in #45870 ### Was this patch authored or co-authored using generative AI tooling? No. Closes #45924 from HyukjinKwon/SPARK-47762. Authored-by: Hyukjin Kwon <[email protected]> Signed-off-by: Hyukjin Kwon <[email protected]>
…setup.py This PR is a followup of #42563 (but using new JIRA as it's already released), which adds `pyspark.sql.connect.protobuf` into `setup.py`. So PyPI packaged PySpark can support protobuf function with Spark Connect on. Yes. The new feature is now available with Spark Connect on if users install Spark Connect by `pip`. Being tested in #45870 No. Closes #45924 from HyukjinKwon/SPARK-47762. Authored-by: Hyukjin Kwon <[email protected]> Signed-off-by: Hyukjin Kwon <[email protected]> (cherry picked from commit f94d95d) Signed-off-by: Hyukjin Kwon <[email protected]>
What changes were proposed in this pull request?
Support python protobuf functions for Spark Connect
Why are the changes needed?
Support python protobuf functions for Spark Connect
Does this PR introduce any user-facing change?
Yes
How was this patch tested?
added doctest and did manual test