-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-16062][SPARK-15989][SQL] Fix two bugs of Python-only UDTs #13778
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
Test build #60834 has finished for PR 13778 at commit
|
|
Test build #60835 has finished for PR 13778 at commit
|
|
cc @davies |
|
Test build #60837 has finished for PR 13778 at commit
|
| private def deserializerFor(input: Expression): Expression = input.dataType match { | ||
| private def deserializerFor(input: Expression): Expression = { | ||
| deserializerFor(input, input.dataType) | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems that this method is never used?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
uh? It is the original deserializerFor method and is used below and above.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh, sorry... Confused by the split diff view...
|
Here's an unresolved example: https://gist.github.com/vlad17/2db8e14972344c693e8a3f03d91c9c8d |
|
Update: looks like the above is just an issue with the |
|
Another update: https://gist.github.com/vlad17/cfcd42f30ea2380df4fb0bfa30dda7ce unresolved |
|
@vlad17 Thanks! I will look into that issue. |
|
@viirya Do we need to fix this in Spark 2.0? UDTs are private APIs and the only intended use case is Vector/Matrix UDTs for MLlib, which doesn't put vectors or matrices inside an array inside a pipeline. In Spark 2.1, we probably need a formal discussion on merging UDT into Encoder, which could completely change its implementation. |
|
@mengxr Although UDTs are private APIs, but as you see from the example, the users can define user classes and corresponding UDTs in Python that will be PythonUserDefinedType. The issues in this PR are not rare cases and it is very possibly PySpark users will hit them during using 2.0. |
|
Test build #60916 has finished for PR 13778 at commit
|
| df = self.spark.createDataFrame( | ||
| [(i % 3, PythonOnlyPoint(float(i), float(i))) for i in range(10)], | ||
| schema=schema) | ||
| df.show() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
DataFrame.show() gives unnecessary stringification, so this test ends up testing unnecessary stuff (in fact it would fail if the UDT didn't have __str__. I would use collect() to force materialization instead.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This test only fails when using show() as I mentioned on the JIRA SPARK-16062.
|
Test build #60996 has finished for PR 13778 at commit
|
|
Test build #61000 has finished for PR 13778 at commit
|
|
ping @vlad17 @davies @liancheng Any thing else? |
|
also cc @yhuai @cloud-fan |
| case _ => "" | ||
| } | ||
|
|
||
| val inputDT = inputDataType.getOrElse(inputData.dataType) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@cloud-fan I think there is no way to easily catch python udt before MapObjects. The approach I use now is to pass a datatype (python udt's sqlType) to MapObjects.
| val loopIsNull = "MapObjects_loopIsNull" + curId.getAndIncrement() | ||
| val loopVar = LambdaVariable(loopValue, loopIsNull, elementType) | ||
| MapObjects(loopValue, loopIsNull, elementType, function(loopVar), inputData) | ||
| MapObjects(loopValue, loopIsNull, elementType, function(loopVar), inputData, None) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is possibly that inputData is unresolved yet. We can't just pass in the data type of inputData. So I still make inputDataType as Option[DataType] below.
|
Test build #61836 has finished for PR 13778 at commit
|
| * @param lambdaFunction A function that take the `loopVar` as input, and used as lambda function | ||
| * to handle collection elements. | ||
| * @param inputData An expression that when evaluated returns a collection object. | ||
| * @param inputDataType The dataType of inputData. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Document that it's an optional and say default behavior is to use the resolved .dataType of inputData by default.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added.
|
Test build #61903 has finished for PR 13778 at commit
|
|
ping @cloud-fan @vlad17 Any thing else? |
|
LGTM +1 |
|
From another point of view, is it necessary to propagate the python UDF from python side to jvm side? IIUC the serialization of python UDT happens at python side, and the jvm side can only see binary for python data, there is nothing we can do at java side. Correct me if I am wrong, thanks. |
|
Python UDT in python side only serializes the python data to sql type defined in the Python UDT. The problem now is happened at the serialization to row in java side on the serialized python data. I think it can not be certain that the serialized python data doesn't need the serialization in java side. |
|
ping @cloud-fan any more concern? |
|
Can you point out where we catch |
|
Oh, I mean they should be serialized/deserialized by pickler. So I think the jvm side doesn't just see binary for python data. It is already processed by picker. That is why we can process these data in encoder/decoder now with its sql type. |
|
yea, so whatever the data type is(python udt or normal sql type), at java side there is no difference, the data is converted to corrected format by pickler. That's why I think maybe it's possible to just pass the corresponding sql type of python udt to java side. My only concern is, sometimes we use the schema of java dataframe as the schema at python side. If we don't pass python udt to java side, the udt information will be lost. @viirya do you mind give it a try? thanks! |
|
@cloud-fan I just checked the python UDT. In python side, we will serialize the python UDT to binary. The python UDT passed to java includes the binary. Then in python side, in the worker we will deserialize the binary back to python UDT and use it for sql data serialization. Because that, I think we can't just pass the sql type of python UDT to java side. What you think? |
| * expression will apply MapObjects on it. However, as the data type | ||
| * of inputData is Python UDT, which is not an expected array type | ||
| * in MapObjects. In this case, we need to explicitly use | ||
| * Python UDT's sqlType as data type. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As we have to mention python udt in MapObjects anyway, I think it makes more sense to add the python udt handling in MapOjects directly.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you mean the early commit? I remember it is the first approach I take.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But I think it exposes python udt to MapObjects?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But now we expose too. Readers have to know about python udt to understand this code.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, ok. Let me update it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we can hide python udt from MapObjects entirely, it worth to do. But looks like we can't, and I think then it makes more sense to expose python udt more explicitly.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Making sense. I will update it later.
|
@cloud-fan Updated. Please take a look. Thanks. |
|
Test build #62151 has finished for PR 13778 at commit
|
|
ping @cloud-fan Please see if this is ok for you now. Thanks. |
|
ping @cloud-fan Can you review this? Thanks. |
|
ping @liancheng @yhuai Maybe you can review this too? |
|
ping @cloud-fan Can you check if this is good for you now? It is for a while. Thanks. |
|
ping @cloud-fan What do you think about this? Can we merge it now? Thanks. |
|
ping @cloud-fan Do you miss this? Or you have other concern? Please let me know. Thanks. |
|
ping @cloud-fan again, this is waiting for a while. Do you have time to look at again? Thanks. |
|
LGTM, merging this into master and 2.0 branch, thanks! |
## What changes were proposed in this pull request?
There are two related bugs of Python-only UDTs. Because the test case of second one needs the first fix too. I put them into one PR. If it is not appropriate, please let me know.
### First bug: When MapObjects works on Python-only UDTs
`RowEncoder` will use `PythonUserDefinedType.sqlType` for its deserializer expression. If the sql type is `ArrayType`, we will have `MapObjects` working on it. But `MapObjects` doesn't consider `PythonUserDefinedType` as its input data type. It causes error like:
import pyspark.sql.group
from pyspark.sql.tests import PythonOnlyPoint, PythonOnlyUDT
from pyspark.sql.types import *
schema = StructType().add("key", LongType()).add("val", PythonOnlyUDT())
df = spark.createDataFrame([(i % 3, PythonOnlyPoint(float(i), float(i))) for i in range(10)], schema=schema)
df.show()
File "/home/spark/python/lib/py4j-0.10.1-src.zip/py4j/protocol.py", line 312, in get_return_value py4j.protocol.Py4JJavaError: An error occurred while calling o36.showString.
: java.lang.RuntimeException: Error while decoding: scala.MatchError: org.apache.spark.sql.types.PythonUserDefinedTypef4ceede8 (of class org.apache.spark.sql.types.PythonUserDefinedType)
...
### Second bug: When Python-only UDTs is the element type of ArrayType
import pyspark.sql.group
from pyspark.sql.tests import PythonOnlyPoint, PythonOnlyUDT
from pyspark.sql.types import *
schema = StructType().add("key", LongType()).add("val", ArrayType(PythonOnlyUDT()))
df = spark.createDataFrame([(i % 3, [PythonOnlyPoint(float(i), float(i))]) for i in range(10)], schema=schema)
df.show()
## How was this patch tested?
PySpark's sql tests.
Author: Liang-Chi Hsieh <[email protected]>
Closes #13778 from viirya/fix-pyudt.
(cherry picked from commit 146001a)
Signed-off-by: Davies Liu <[email protected]>
What changes were proposed in this pull request?
There are two related bugs of Python-only UDTs. Because the test case of second one needs the first fix too. I put them into one PR. If it is not appropriate, please let me know.
First bug: When MapObjects works on Python-only UDTs
RowEncoderwill usePythonUserDefinedType.sqlTypefor its deserializer expression. If the sql type isArrayType, we will haveMapObjectsworking on it. ButMapObjectsdoesn't considerPythonUserDefinedTypeas its input data type. It causes error like:Second bug: When Python-only UDTs is the element type of ArrayType
How was this patch tested?
PySpark's sql tests.