-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-19311][SQL] fix UDT hierarchy issue #16660
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
cc @cloud-fan @rxin Although UDT is now private API, some developers still use it by defining their codes as spark package. I am not sure if you think we need to fix this or not. |
|
is it possible to add a unit test? the change LGTM |
|
LGTM too. @gmoehler Can you add an unit test? |
|
ok to test |
|
Test build #71755 has finished for PR 16660 at commit
|
…ewhere? Test case failure is: - SPARK-19311: UDFs disregard UDT type hierarchy *** FAILED *** org.apache.spark.sql.catalyst.errors.package$TreeNodeException: Max iterations (100) reached for batch Resolution, tree: Project [UDF(cast(cast(cast(cast(cast(cast(cast(cast(cast(cast(cast(cast(cast(cast(cast(cast(cast(cast(cast(cast(cast(cast(cast(cast(cast(cast(cast(cast(cast(cast(cast(cast(cast(cast(cast(cast(cast(cast(cast(cast(cast(cast(cast(cast(cast(cast(cast(cast(cast(cast(cast(cast(cast(cast(cast(cast(cast(cast(cast(cast(cast(cast(cast(cast(cast(cast(cast(cast(cast(cast(cast(cast(cast(cast(cast(cast(cast(cast(cast(cast(cast(cast(cast(cast(cast(cast(cast(cast(cast(cast(cast(cast(cast(cast(cast(cast(cast(cast(cast(UDF(41) as exampleBaseType) as exampleBaseType) as exampleBaseType) as exampleBaseType) as exampleBaseType) as exampleBaseType) as exampleBaseType) as exampleBaseType) as exampleBaseType) as exampleBaseType) as exampleBaseType) as exampleBaseType) as exampleBaseType) as exampleBaseType) as exampleBaseType) as exampleBaseType) as exampleBaseType) as exampleBaseType) as exampleBaseType) as exampleBaseType) as exampleBaseType) as exampleBaseType) as exampleBaseType) as exampleBaseType) as exampleBaseType) as exampleBaseType) as exampleBaseType) as exampleBaseType) as exampleBaseType) as exampleBaseType) as exampleBaseType) as exampleBaseType) as exampleBaseType) as exampleBaseType) as exampleBaseType) as exampleBaseType) as exampleBaseType) as exampleBaseType) as exampleBaseType) as exampleBaseType) as exampleBaseType) as exampleBaseType) as exampleBaseType) as exampleBaseType) as exampleBaseType) as exampleBaseType) as exampleBaseType) as exampleBaseType) as exampleBaseType) as exampleBaseType) as exampleBaseType) as exampleBaseType) as exampleBaseType) as exampleBaseType) as exampleBaseType) as exampleBaseType) as exampleBaseType) as exampleBaseType) as exampleBaseType) as exampleBaseType) as exampleBaseType) as exampleBaseType) as exampleBaseType) as exampleBaseType) as exampleBaseType) as exampleBaseType) as exampleBaseType) as exampleBaseType) as exampleBaseType) as exampleBaseType) as exampleBaseType) as exampleBaseType) as exampleBaseType) as exampleBaseType) as exampleBaseType) as exampleBaseType) as exampleBaseType) as exampleBaseType) as exampleBaseType) as exampleBaseType) as exampleBaseType) as exampleBaseType) as exampleBaseType) as exampleBaseType) as exampleBaseType) as exampleBaseType) as exampleBaseType) as exampleBaseType) as exampleBaseType) as exampleBaseType) as exampleBaseType) as exampleBaseType) as exampleBaseType) as exampleBaseType) as exampleBaseType) as exampleBaseType) as exampleBaseType) as exampleBaseType) as exampleBaseType)) AS UDF(UDF(41))apache#166] +- SubqueryAlias tmp_table +- Project [_1#157 AS id#160, _2#158 AS saying#161] +- LocalRelation [_1#157, _2#158] at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1.apply(RuleExecutor.scala:105) at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1.apply(RuleExecutor.scala:74) at scala.collection.immutable.List.foreach(List.scala:381) at org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:74) at org.apache.spark.sql.execution.QueryExecution.analyzed$lzycompute(QueryExecution.scala:64) at org.apache.spark.sql.execution.QueryExecution.analyzed(QueryExecution.scala:62) at org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:48) at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:64) at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:592) at org.apache.spark.sql.test.SQLTestUtils$$anonfun$sql$1.apply(SQLTestUtils.scala:61)
|
Test build #71841 has finished for PR 16660 at commit
|
|
Test build #71846 has finished for PR 16660 at commit
|
|
|
||
| // object and classes to test SPARK-19311 | ||
|
|
||
| // Trait/Interface for base type |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please fix the indentation.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
corrected in latest commit ... thanks for pointing out
|
Test build #71849 has finished for PR 16660 at commit
|
|
|
||
| // a base class | ||
| class ExampleBaseClass(override val field: Int) extends IExampleBaseType { | ||
| override def toString: String = field.toString |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why override toString here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please simplify it to
class ExampleBaseClass(override val field: Int) extends IExampleBaseType
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@gmoehler I think we don't need toString?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok
|
Test build #71852 has finished for PR 16660 at commit
|
| import org.apache.spark.sql.test.SharedSQLContext | ||
| import org.apache.spark.sql.types._ | ||
|
|
||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: Please remove this empty line for avoiding unnecessary changes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok
| extends ExampleBaseClass(field) with IExampleSubType | ||
|
|
||
| // UDT for base class | ||
| private[spark] class ExampleBaseTypeUDT extends UserDefinedType[IExampleBaseType] { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
private[spark] is not needed here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok
|
|
||
| override def typeName: String = "exampleBaseType" | ||
|
|
||
| private[spark] override def asNullable: ExampleBaseTypeUDT = this |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: to simplify the test cases, please remove asNullable , typeName , equals and hashCode
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok
| } | ||
|
|
||
| override def serialize(obj: IExampleSubType): InternalRow = { | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: Please remove this line. Thanks!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not sure which line you mean... we need to overwrite serialize in any case I guess - or is it about a blank line?
| datum match { | ||
| case row: InternalRow => | ||
| require(row.numFields == 1, | ||
| s"VectorUDT.deserialize given row with length " + |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please remove this unneeded s String Interpolator. Thanks!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok - replaced it with a fixed string
|
|
||
| override def typeName: String = "exampleSubType" | ||
|
|
||
| private[spark] override def asNullable: ExampleSubTypeUDT = this |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: to simplify the test cases, please remove asNullable, typeName, equals and hashCode too.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
|
|
||
|
|
||
| class UserDefinedTypeSuite extends QueryTest with SharedSQLContext with ParquetTest { | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: Please remove this empty line.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok
|
It looks great! My comment is just to simplify the unit test cases. Normally, we want to make the unit test cases as simple as possible. Thanks! LGTM except a few minor comments. |
| } | ||
|
|
||
| test("SPARK-19311: UDFs disregard UDT type hierarchy") { | ||
| UDTRegistration.register(classOf[IExampleBaseType].getName, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
With SQLUserDefinedType, no need to use UDTRegistration. We can remove this two lines.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i tend to leave them, but remove the @SQLUserDefinedType, so we have a test that uses UDTRegistration
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
oh. if you worry about that, actually we have UDTRegistrationSuite for test case of UDTRegistration. i am fine to either SQLUserDefinedType or UDTRegistration.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok
|
|
||
| override def hashCode(): Int = classOf[ExampleSubTypeUDT].getName.hashCode() | ||
|
|
||
| override def equals(other: Any): Boolean = other.isInstanceOf[ExampleSubTypeUDT] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
equals in UserDefinedType has default impl. to call acceptsType.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
right - i have removed equals anyway due to a prev comment of @gatorsmile.
|
Thanks for the valuable (and fast!) comments - i have worked them in. |
|
Test build #71929 has finished for PR 16660 at commit
|
|
LGTM except one minor comment. |
|
@viirya Which comment are you referring to? I thought i had included all of them ;-) |
|
The remaining comment is: #16660 (comment) |
|
LGTM except one comment. Thanks for working on this! |
|
@gmoehler |
|
Thanks for pointing out. I had overseen this comment. |
|
Test build #71980 has finished for PR 16660 at commit
|
|
LGTM |
|
Thanks! Merging to master/2.1 |
## What changes were proposed in this pull request? acceptType() in UDT will no only accept the same type but also all base types ## How was this patch tested? Manual test using a set of generated UDTs fixing acceptType() in my user defined types Please review http://spark.apache.org/contributing.html before opening a pull request. Author: gmoehler <[email protected]> Closes apache#16660 from gmoehler/master.
## What changes were proposed in this pull request? acceptType() in UDT will no only accept the same type but also all base types ## How was this patch tested? Manual test using a set of generated UDTs fixing acceptType() in my user defined types Please review http://spark.apache.org/contributing.html before opening a pull request. Author: gmoehler <[email protected]> Closes apache#16660 from gmoehler/master.
…e rows in ScalaUDF as well ### What changes were proposed in this pull request? This PR tries to address the comment: #28645 (comment) It changes `canUpCast/canCast` to allow cast from sub UDT to base UDT, in order to achieve the goal to allow UserDefinedType to use `ExpressionEncoder` to deserialize rows in ScalaUDF as well. One thing that needs to mention is, even we allow cast from sub UDT to base UDT, it doesn't really do the cast in `Cast`. Because, yet, sub UDT and base UDT are considered as the same type(because of #16660), see: https://github.com/apache/spark/blob/5264164a67df498b73facae207eda12ee133be7d/sql/catalyst/src/main/scala/org/apache/spark/sql/types/UserDefinedType.scala#L81-L86 https://github.com/apache/spark/blob/5264164a67df498b73facae207eda12ee133be7d/sql/catalyst/src/main/scala/org/apache/spark/sql/types/UserDefinedType.scala#L92-L95 Therefore, the optimize rule `SimplifyCast` will eliminate the cast at the end. ### Why are the changes needed? Reduce the special case caused by `UserDefinedType` in `ResolveEncodersInUDF` and `ScalaUDF`. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? It should be covered by the test of `SPARK-19311`, which is also updated a little in this PR. Closes #28920 from Ngone51/fix-udf-udt. Authored-by: yi.wu <[email protected]> Signed-off-by: Wenchen Fan <[email protected]>
What changes were proposed in this pull request?
acceptType() in UDT will no only accept the same type but also all base types
How was this patch tested?
Manual test using a set of generated UDTs fixing acceptType() in my user defined types
*Update: I added a test case to https://github.com/apache/spark/blob/master/sql/core/src/test/scala/org/apache/spark/sql/UserDefinedTypeSuite.scala
Please review http://spark.apache.org/contributing.html before opening a pull request.