Skip to content

Conversation

@Ngone51
Copy link
Member

@Ngone51 Ngone51 commented May 26, 2020

What changes were proposed in this pull request?

This PR adds support for typed Scala UDF to accept composed type of case class, e.g. Seq[T], Array[T], Map[Int, T] (assuming T is case class type), as input parameter type.

Why are the changes needed?

After #27937, typed Scala UDF now has supported case class as its input parameter type. However, it can not accept the composed type of case class, such as Seq[T], Array[T], Map[Int, T] (assuming T is case class type), which causing confuse(e.g. #27937 (comment)) to the user.

Does this PR introduce any user-facing change?

Yes.

Run the query:

scala> case class Person(name: String, age: Int)
scala> Seq((1, Seq(Person("Jack", 5)))).toDF("id", "persons").withColumn("ages", udf{ s: Seq[Person] => s.head.age }.apply(col("persons"))).show

Before:


org.apache.spark.SparkException: Failed to execute user defined function($read$$Lambda$2861/628175152: (array<struct<name:string,age:int>>) => int)
  at org.apache.spark.sql.catalyst.expressions.ScalaUDF.eval(ScalaUDF.scala:1129)
  at org.apache.spark.sql.catalyst.expressions.Alias.eval(namedExpressions.scala:156)
  at org.apache.spark.sql.catalyst.expressions.InterpretedMutableProjection.apply(InterpretedMutableProjection.scala:83)
  at org.apache.spark.sql.catalyst.optimizer.ConvertToLocalRelation$$anonfun$apply$17.$anonfun$applyOrElse$69(Optimizer.scala:1492)
  at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238)

....

Caused by: java.lang.ClassCastException: org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema cannot be cast to Person
  at $anonfun$res3$1(<console>:30)
  at $anonfun$res3$1$adapted(<console>:30)
  at org.apache.spark.sql.catalyst.expressions.ScalaUDF.$anonfun$f$2(ScalaUDF.scala:156)
  at org.apache.spark.sql.catalyst.expressions.ScalaUDF.eval(ScalaUDF.scala:1126)
  ... 142 more

After:

+---+-----------+----+
| id|    persons|ages|
+---+-----------+----+
|  1|[[Jack, 5]]| [5]|
+---+-----------+----+

How was this patch tested?

Added tests.

@Ngone51
Copy link
Member Author

Ngone51 commented May 26, 2020

ping @koertkuipers @cloud-fan Please take a look, thanks!

row: Any => fromRow(row.asInstanceOf[InternalRow])
} else {
val child = children(i)
val attrs = new StructType().add(s"$child", child.dataType).toAttributes
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

child.toString can be expensive. how about "child"? The name doesn't matter anyway.

encoder match {
case Some(enc) =>
if (enc.isSerializedAsStructForTopLevel) {
val fromRow = enc.resolveAndBind().createDeserializer()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

to be consistent, shall we bind with child.dataType.asInstanceOf[StructType].toAttributes?

checkAnswer(df.select(myUdf(Column("col1"), Column("col2"))), Row(2020) :: Nil)
}

test("case class as element type of Seq/Array") {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we also test some special cases:

  1. the catalyst schema has more fields than the case class. e.g. struct<key: int, value: string, col: int> and case class TestData(key: Int, value: String)
  2. the fields order doesn't match, e.g. struct<value: string, key: int> and case class TestData(key: Int, value: String)
  3. the catalyst schema has missing fields, e.g. struct<key: int> and case class TestData(key: Int, value: String)

@SparkQA
Copy link

SparkQA commented May 26, 2020

Test build #123125 has finished for PR 28645 at commit c1a2d1c.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

checkAnswer(df.select(myUdf(Column("col1"), Column("col2"))), Row(2020) :: Nil)
}

test("case class as element type of Seq/Array") {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

some option tests would be good too.

option in a simple seq:
Seq(Seq(Some(1), None)).toDF.withColumn("value", udf{ s: Seq[Option[Int]] => s.map(_.map(_ + 1)) }.apply(col("value")) )

option for function argument:
Seq(None, Some(1), None).toDF.withColumn("value", udf{ o: Option[Int] => o.map(_ + 1) }.apply(col("value")))

note that the top level option to express nullability is a very common use case in particular and supported by encoders. the equivalent in Dataset is:
Seq(None, Some(1), None).toDS.map{ o: Option[Int] => o.map(_ + 1) }

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's implementation difference between udf and Dataset.map. So for the second case you're mentioned, it only work in Dataset.map but fail in udf.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's implementation difference between udf and Dataset.map. So for the second case you're mentioned, it only work in Dataset.map but fail in udf.

if the goal is to stop people from writing their own implementations of udf then the second case is also needed...

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@koertkuipers I've updated to support the second case.

@SparkQA
Copy link

SparkQA commented May 27, 2020

Test build #123192 has finished for PR 28645 at commit bb26320.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • test(\"case class as generic type of Option\")
  • case class TestData4(a: Int, b: Int, c: Int)
  • case class TestData5(value: String, key: Int)

@SparkQA
Copy link

SparkQA commented May 28, 2020

Test build #123209 has finished for PR 28645 at commit 5e0f445.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented May 29, 2020

Test build #123267 has finished for PR 28645 at commit 8576d28.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

nullable: Boolean = true,
udfDeterministic: Boolean = true)
udfDeterministic: Boolean = true,
inputDeserializers: Seq[Option[Deserializer[_]]] = Nil)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we need the inputEncoders parameter anymore?

@SparkQA
Copy link

SparkQA commented Jun 3, 2020

Test build #123478 has finished for PR 28645 at commit 86035fa.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@Ngone51
Copy link
Member Author

Ngone51 commented Jun 4, 2020

cc @cloud-fan Could you please take another look?

@SparkQA
Copy link

SparkQA commented Jun 5, 2020

Test build #123550 has finished for PR 28645 at commit 86035fa.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

@cloud-fan
Copy link
Contributor

retest this please

@SparkQA
Copy link

SparkQA commented Jun 8, 2020

Test build #123635 has finished for PR 28645 at commit 86035fa.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jun 9, 2020

Test build #123666 has finished for PR 28645 at commit 2527c69.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

@cloud-fan
Copy link
Contributor

retest this please

createToScalaConverter(i, c.dataType)
}.toArray :+ CatalystTypeConverters.createToCatalystConverter(dataType)
scalaConverter(i, c.dataType)
}.toArray :+ createToCatalystConverter(dataType)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does the UDF return type support case class?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes.

val initArg = if (CatalystTypeConverters.isPrimitive(dt)) {
// Check `inputPrimitives` when it's not empty in order to figure out the Option
// type as non primitive type, e.g., Option[Int]. Fall back to `isPrimitive` when
// `inputPrimitives` is empty for other cases, e.g., Java UDF, untyped Scala UDF
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so untyped Scala UDF doesn't support Option?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea. We require the encoder to support Option but untyped Scala UDF can't provide the encoder.

s"Object $argTerm = ${eval.isNull} ? null : $convertersTerm[$i].apply(${eval.value});"
s"""
|Object $argTerm = null;
|// handle the top level Option type specifically
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's special for top-level Option?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For the top-level Option, e.g. Option[T], it's internal data type is T. However, for a udf, it always requires the external data type for its input values. So, when the ScalaUDF receives a null value of type T from the child, it needs to convert it to None instead of simply passing in the null value like other nullable data types.

@SparkQA
Copy link

SparkQA commented Jun 18, 2020

Test build #124212 has finished for PR 28645 at commit 1c82558.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jun 18, 2020

Test build #124221 has finished for PR 28645 at commit 3e97fa5.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

checkAnswer(df.select(myUdf(Column("col"))), Row(100) :: Row(null) :: Nil)
}

test("top level Option case class") {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is already tested in case class as generic type of Option

val df = spark.range(1)
.select(lit(50).as("a"))
.select(struct("a").as("col"))
val error = intercept[AnalysisException] (df.select(myUdf(Column("col"))))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: no space between intercept[AnalysisException] and (df.select...

@SparkQA
Copy link

SparkQA commented Jun 19, 2020

Test build #124248 has finished for PR 28645 at commit e6bb55d.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jun 19, 2020

Test build #124260 has finished for PR 28645 at commit f29a62a.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

@Ngone51
Copy link
Member Author

Ngone51 commented Jun 19, 2020

Jenkins, retest this please.

@SparkQA
Copy link

SparkQA commented Jun 19, 2020

Test build #124267 has finished for PR 28645 at commit f29a62a.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@cloud-fan
Copy link
Contributor

thanks, merging to master!

@cloud-fan cloud-fan closed this in 5ee5cfd Jun 19, 2020
@Ngone51
Copy link
Member Author

Ngone51 commented Jun 19, 2020

thanks all!!

cloud-fan pushed a commit that referenced this pull request Jun 24, 2020
…e rows in ScalaUDF as well

### What changes were proposed in this pull request?

This PR tries to address the comment: #28645 (comment)
It changes `canUpCast/canCast` to allow cast from sub UDT to base UDT, in order to achieve the goal to allow UserDefinedType to use `ExpressionEncoder` to deserialize rows in ScalaUDF as well.

One thing that needs to mention is, even we allow cast from sub UDT to base UDT, it doesn't really do the cast in `Cast`. Because, yet, sub UDT and base UDT are considered as the same type(because of #16660), see:

https://github.com/apache/spark/blob/5264164a67df498b73facae207eda12ee133be7d/sql/catalyst/src/main/scala/org/apache/spark/sql/types/UserDefinedType.scala#L81-L86

https://github.com/apache/spark/blob/5264164a67df498b73facae207eda12ee133be7d/sql/catalyst/src/main/scala/org/apache/spark/sql/types/UserDefinedType.scala#L92-L95

Therefore, the optimize rule `SimplifyCast` will eliminate the cast at the end.

### Why are the changes needed?

Reduce the special case caused by `UserDefinedType` in `ResolveEncodersInUDF` and `ScalaUDF`.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

It should be covered by the test of `SPARK-19311`, which is also updated a little in this PR.

Closes #28920 from Ngone51/fix-udf-udt.

Authored-by: yi.wu <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
cloud-fan pushed a commit that referenced this pull request Apr 28, 2024
### What changes were proposed in this pull request?

This PR fixes a correctness issue by moving the batch that resolves udf decoders to after the `UpdateNullability` batch. This means we now derive a  decoder with the updated attributes which fixes a correctness issue.

I think the issue has existed since #28645 when udf support case class arguments was added. So therefore this issue should be present in all currently supported versions.

### Why are the changes needed?

Currently the following code
```
scala> val ds1 = Seq(1).toDS()
     | val ds2 = Seq[Int]().toDS()
     | val f = udf[Tuple1[Option[Int]],Tuple1[Option[Int]]](identity)
     | ds1.join(ds2, ds1("value") === ds2("value"), "left_outer").select(f(struct(ds2("value")))).collect()
val ds1: org.apache.spark.sql.Dataset[Int] = [value: int]
val ds2: org.apache.spark.sql.Dataset[Int] = [value: int]
val f: org.apache.spark.sql.expressions.UserDefinedFunction = SparkUserDefinedFunction($Lambda$2481/0x00007f7f50961f086b1a2c9f,StructType(StructField(_1,IntegerType,true)),List(Some(class[_1[0]: int])),Some(class[_1[0]: int]),None,true,true)
val res0: Array[org.apache.spark.sql.Row] = Array([[0]])
```
results in an row containing `0` this is incorrect as the value should be `null`. Removing the udf call
```
scala> ds1.join(ds2, ds1("value") === ds2("value"), "left_outer").select(struct(ds2("value"))).collect()
val res1: Array[org.apache.spark.sql.Row] = Array([[null]])
```
gives the correct value.

### Does this PR introduce _any_ user-facing change?

Yes, fixes a correctness issue when using ScalaUDFs.

### How was this patch tested?

Existing and new unit tests.

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes #46156 from eejbyfeldt/SPARK-47927.

Authored-by: Emil Ejbyfeldt <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
cloud-fan pushed a commit that referenced this pull request Apr 28, 2024
This PR fixes a correctness issue by moving the batch that resolves udf decoders to after the `UpdateNullability` batch. This means we now derive a  decoder with the updated attributes which fixes a correctness issue.

I think the issue has existed since #28645 when udf support case class arguments was added. So therefore this issue should be present in all currently supported versions.

Currently the following code
```
scala> val ds1 = Seq(1).toDS()
     | val ds2 = Seq[Int]().toDS()
     | val f = udf[Tuple1[Option[Int]],Tuple1[Option[Int]]](identity)
     | ds1.join(ds2, ds1("value") === ds2("value"), "left_outer").select(f(struct(ds2("value")))).collect()
val ds1: org.apache.spark.sql.Dataset[Int] = [value: int]
val ds2: org.apache.spark.sql.Dataset[Int] = [value: int]
val f: org.apache.spark.sql.expressions.UserDefinedFunction = SparkUserDefinedFunction($Lambda$2481/0x00007f7f50961f086b1a2c9f,StructType(StructField(_1,IntegerType,true)),List(Some(class[_1[0]: int])),Some(class[_1[0]: int]),None,true,true)
val res0: Array[org.apache.spark.sql.Row] = Array([[0]])
```
results in an row containing `0` this is incorrect as the value should be `null`. Removing the udf call
```
scala> ds1.join(ds2, ds1("value") === ds2("value"), "left_outer").select(struct(ds2("value"))).collect()
val res1: Array[org.apache.spark.sql.Row] = Array([[null]])
```
gives the correct value.

Yes, fixes a correctness issue when using ScalaUDFs.

Existing and new unit tests.

No.

Closes #46156 from eejbyfeldt/SPARK-47927.

Authored-by: Emil Ejbyfeldt <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
(cherry picked from commit 8b8ea60)
Signed-off-by: Wenchen Fan <[email protected]>
cloud-fan pushed a commit that referenced this pull request Apr 28, 2024
### What changes were proposed in this pull request?

This PR fixes a correctness issue by moving the batch that resolves udf decoders to after the `UpdateNullability` batch. This means we now derive a  decoder with the updated attributes which fixes a correctness issue.

I think the issue has existed since #28645 when udf support case class arguments was added. So therefore this issue should be present in all currently supported versions.

### Why are the changes needed?

Currently the following code
```
scala> val ds1 = Seq(1).toDS()
     | val ds2 = Seq[Int]().toDS()
     | val f = udf[Tuple1[Option[Int]],Tuple1[Option[Int]]](identity)
     | ds1.join(ds2, ds1("value") === ds2("value"), "left_outer").select(f(struct(ds2("value")))).collect()
val ds1: org.apache.spark.sql.Dataset[Int] = [value: int]
val ds2: org.apache.spark.sql.Dataset[Int] = [value: int]
val f: org.apache.spark.sql.expressions.UserDefinedFunction = SparkUserDefinedFunction($Lambda$2481/0x00007f7f50961f086b1a2c9f,StructType(StructField(_1,IntegerType,true)),List(Some(class[_1[0]: int])),Some(class[_1[0]: int]),None,true,true)
val res0: Array[org.apache.spark.sql.Row] = Array([[0]])
```
results in an row containing `0` this is incorrect as the value should be `null`. Removing the udf call
```
scala> ds1.join(ds2, ds1("value") === ds2("value"), "left_outer").select(struct(ds2("value"))).collect()
val res1: Array[org.apache.spark.sql.Row] = Array([[null]])
```
gives the correct value.

### Does this PR introduce _any_ user-facing change?

Yes, fixes a correctness issue when using ScalaUDFs.

### How was this patch tested?

Existing and new unit tests.

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes #46156 from eejbyfeldt/SPARK-47927.

Authored-by: Emil Ejbyfeldt <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
(cherry picked from commit 8b8ea60)
Signed-off-by: Wenchen Fan <[email protected]>
turboFei pushed a commit to turboFei/spark that referenced this pull request Nov 6, 2025
This PR fixes a correctness issue by moving the batch that resolves udf decoders to after the `UpdateNullability` batch. This means we now derive a  decoder with the updated attributes which fixes a correctness issue.

I think the issue has existed since apache#28645 when udf support case class arguments was added. So therefore this issue should be present in all currently supported versions.

Currently the following code
```
scala> val ds1 = Seq(1).toDS()
     | val ds2 = Seq[Int]().toDS()
     | val f = udf[Tuple1[Option[Int]],Tuple1[Option[Int]]](identity)
     | ds1.join(ds2, ds1("value") === ds2("value"), "left_outer").select(f(struct(ds2("value")))).collect()
val ds1: org.apache.spark.sql.Dataset[Int] = [value: int]
val ds2: org.apache.spark.sql.Dataset[Int] = [value: int]
val f: org.apache.spark.sql.expressions.UserDefinedFunction = SparkUserDefinedFunction($Lambda$2481/0x00007f7f50961f086b1a2c9f,StructType(StructField(_1,IntegerType,true)),List(Some(class[_1[0]: int])),Some(class[_1[0]: int]),None,true,true)
val res0: Array[org.apache.spark.sql.Row] = Array([[0]])
```
results in an row containing `0` this is incorrect as the value should be `null`. Removing the udf call
```
scala> ds1.join(ds2, ds1("value") === ds2("value"), "left_outer").select(struct(ds2("value"))).collect()
val res1: Array[org.apache.spark.sql.Row] = Array([[null]])
```
gives the correct value.

Yes, fixes a correctness issue when using ScalaUDFs.

Existing and new unit tests.

No.

Closes apache#46156 from eejbyfeldt/SPARK-47927.

Authored-by: Emil Ejbyfeldt <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
(cherry picked from commit 8b8ea60)
Signed-off-by: Wenchen Fan <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants