Skip to content

Conversation

@Ngone51
Copy link
Member

@Ngone51 Ngone51 commented Mar 17, 2020

What changes were proposed in this pull request?

To support case class parameter for typed Scala UDF, e.g.

case class TestData(key: Int, value: String)
val f = (d: TestData) => d.key * d.value.toInt
val myUdf = udf(f)
val df = Seq(("data", TestData(50, "2"))).toDF("col1", "col2")
checkAnswer(df.select(myUdf(Column("col2"))), Row(100) :: Nil)

Why are the changes needed?

Currently, Spark UDF can only work on data types like java.lang.String, o.a.s.sql.Row, Seq[_], etc. This is inconvenient if user want to apply an operation on one column, and the column is struct type. You must access data from a Row object, instead of domain object like Dataset operations. It will be great if UDF can work on types that are supported by Dataset, e.g. case class.

And here's benchmark result of using case class comparing to row:

// case class:  58ms 65ms 59ms 64ms 61ms
// row:         59ms 64ms 73ms 84ms 69ms
val f1 = (d: TestData) => s"${d.key}, ${d.value}"
val f2 = (r: Row) => s"${r.getInt(0)}, ${r.getString(1)}"
val udf1 = udf(f1)
// set spark.sql.legacy.allowUntypedScalaUDF=true
val udf2 = udf(f2, StringType)

val df = spark.range(100000).selectExpr("cast (id as int) as id")
    .select(struct('id, lit("str")).as("col"))
df.cache().collect()

// warmup to exclude some extra influence
df.select(udf1('col)).write.mode(SaveMode.Overwrite).format("noop").save()
df.select(udf2('col)).write.mode(SaveMode.Overwrite).format("noop").save()


start = System.currentTimeMillis()
df.select(udf1('col)).write.mode(SaveMode.Overwrite).format("noop").save()
println(System.currentTimeMillis() - start)

start = System.currentTimeMillis()
df.select(udf2('col)).write.mode(SaveMode.Overwrite).format("noop").save()
println(System.currentTimeMillis() - start)

Does this PR introduce any user-facing change?

Yes. User now could be able to use typed Scala UDF with case class as input parameter.

How was this patch tested?

Added unit tests.

@Ngone51
Copy link
Member Author

Ngone51 commented Mar 17, 2020

cc @cloud-fan

@SparkQA
Copy link

SparkQA commented Mar 17, 2020

Test build #119938 has finished for PR 27937 at commit 2b186bd.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Mar 18, 2020

Test build #119983 has finished for PR 27937 at commit d600cf7.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.


private def createToScalaConverter(i: Int, dataType: DataType): Any => Any = {
val encoder = inputEncoders(i)
encoder.isSerializedAsStructForTopLevel match {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's weir to pattern match a boolean, can we write if else?

private def createToScalaConverter(i: Int, dataType: DataType): Any => Any = {
val encoder = inputEncoders(i)
encoder.isSerializedAsStructForTopLevel match {
case true => r: Any => encoder.resolveAndBind().fromRow(r.asInstanceOf[InternalRow])
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We shouldn't call resolveAndBind for each input row

val ScalaReflection.Schema(dataType, nullable) = ScalaReflection.schemaFor[RT]
val inputSchemas: Seq[Option[ScalaReflection.Schema]] = Nil
val udf = SparkUserDefinedFunction(func, dataType, inputSchemas).withName(name)
val inputEncoders: Seq[ExpressionEncoder[_]] = Nil
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we update the script that generates these methods?

f: AnyRef,
dataType: DataType,
inputSchemas: Seq[Option[ScalaReflection.Schema]],
inputEncoders: Seq[ExpressionEncoder[_]] = Nil,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shall we have a simple inputInfos: Seq[Option[(ScalaReflection.Schema, ExpressionEncoder)]]?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

or is it possible to get the schema and nullability from the encoder?

val ScalaReflection.Schema(dataType, nullable) = ScalaReflection.schemaFor[RT]
val inputSchemas = Try(ScalaReflection.schemaFor(typeTag[A1])).toOption :: Try(ScalaReflection.schemaFor(typeTag[A2])).toOption :: Nil
val udf = SparkUserDefinedFunction(f, dataType, inputSchemas)
val inputEncoders: Seq[ExpressionEncoder[_]] = ExpressionEncoder[A1]() :: ExpressionEncoder[A2]() :: Nil
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the type is Any, we may fail to create encoder and we should catch it, like what we've done for ScalaReflection.schemaFor above.

checkAnswer(df.select(myUdf(Column("col1"), Column("col2"))), Row(500) :: Nil)
}

test("input case class parameter and return case class ") {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we test nested case calss as well?

@Ngone51
Copy link
Member Author

Ngone51 commented Mar 18, 2020

@cloud-fan Oh...I missed your review comments after updates...Outdateds doesn't really address your comments...I'll update again later...

@SparkQA
Copy link

SparkQA commented Mar 18, 2020

Test build #119997 has finished for PR 27937 at commit 867ad06.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@Ngone51 Ngone51 changed the title [WIP][SPARK-30127][SQL] Support case class parameter for typed Scala UDF [SPARK-30127][SQL] Support case class parameter for typed Scala UDF Mar 19, 2020
@SparkQA
Copy link

SparkQA commented Mar 19, 2020

Test build #120018 has finished for PR 27937 at commit 23ca098.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

@Ngone51
Copy link
Member Author

Ngone51 commented Mar 19, 2020

retest this please.

@SparkQA
Copy link

SparkQA commented Mar 19, 2020

Test build #120041 has finished for PR 27937 at commit 23ca098.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Mar 19, 2020

Test build #120061 has finished for PR 27937 at commit 842d6fa.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

} else {
val encoder = inputEncoders(i)
if (encoder.isDefined && encoder.get.isSerializedAsStructForTopLevel) {
val enc = resolvedEnc.getOrElseUpdate(i, encoder.get.resolveAndBind())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why we need resolvedEnc? I think we can simply write

val enc = encoder.get.resolveAndBind()
row: Any => enc.fromRow(row.asInstanceOf[InternalRow])

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

make sense. updated.

* other hand, Java UDFs can only have boxed types, thus this parameter will
* always be all false.
*/
def inputPrimitives: Seq[Boolean] = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can this be Nil?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think need to return children.map(_ => false) if inputEncoders is empty.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It can be Nil. Previously, Java UDF returns children.map(_ => false) and it has the same affect with Nil indeed. And also, untyped Scala UDF always input Nil.

But for typed Scala UDF, it will aways has inputPrimitives and inputTypes.

* better to use Option of Seq[DataType] so we can use "None" as the case for no
* type coercion. However, that would require more refactoring of the codebase.
*/
def inputTypes: Seq[AbstractDataType] = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unless we guarantee inputEncoders always have the length of children.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similarly, the input types of Java UDF and untyped Scala UDF are always Nil.

@SparkQA
Copy link

SparkQA commented Mar 23, 2020

Test build #120173 has finished for PR 27937 at commit 8e82f3f.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

* The analyzer should be aware of Scala primitive types so as to make the
* UDF return null if there is any null input value of these types. On the
* other hand, Java UDFs can only have boxed types, thus this parameter will
* always be all false.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to make sure the comment is accurate. Java UDFs can only have boxed types, thus this parameter will always be all false. This is wrong now.

I agree that Nil is fine in this case, but the comment needs to be updated.

Copy link
Contributor

@cloud-fan cloud-fan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM except one comment

@SparkQA
Copy link

SparkQA commented Mar 23, 2020

Test build #120188 has finished for PR 27937 at commit b0b298e.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@cloud-fan
Copy link
Contributor

thanks, merging to master/3.0!

@cloud-fan cloud-fan closed this in f6ff7d0 Mar 24, 2020
cloud-fan pushed a commit that referenced this pull request Mar 24, 2020
### What changes were proposed in this pull request?

To support  case class parameter for typed Scala UDF, e.g.

```
case class TestData(key: Int, value: String)
val f = (d: TestData) => d.key * d.value.toInt
val myUdf = udf(f)
val df = Seq(("data", TestData(50, "2"))).toDF("col1", "col2")
checkAnswer(df.select(myUdf(Column("col2"))), Row(100) :: Nil)
```

### Why are the changes needed?

Currently, Spark UDF can only work on data types like java.lang.String, o.a.s.sql.Row, Seq[_], etc. This is inconvenient if user want to apply an operation on one column, and the column is struct type. You must access data from a Row object, instead of domain object like Dataset operations. It will be great if UDF can work on types that are supported by Dataset, e.g. case class.

And here's benchmark result of using case class comparing to row:

```scala

// case class:  58ms 65ms 59ms 64ms 61ms
// row:         59ms 64ms 73ms 84ms 69ms
val f1 = (d: TestData) => s"${d.key}, ${d.value}"
val f2 = (r: Row) => s"${r.getInt(0)}, ${r.getString(1)}"
val udf1 = udf(f1)
// set spark.sql.legacy.allowUntypedScalaUDF=true
val udf2 = udf(f2, StringType)

val df = spark.range(100000).selectExpr("cast (id as int) as id")
    .select(struct('id, lit("str")).as("col"))
df.cache().collect()

// warmup to exclude some extra influence
df.select(udf1('col)).write.mode(SaveMode.Overwrite).format("noop").save()
df.select(udf2('col)).write.mode(SaveMode.Overwrite).format("noop").save()

start = System.currentTimeMillis()
df.select(udf1('col)).write.mode(SaveMode.Overwrite).format("noop").save()
println(System.currentTimeMillis() - start)

start = System.currentTimeMillis()
df.select(udf2('col)).write.mode(SaveMode.Overwrite).format("noop").save()
println(System.currentTimeMillis() - start)

```

### Does this PR introduce any user-facing change?

Yes. User now could be able to use typed Scala UDF with case class as input parameter.

### How was this patch tested?

Added unit tests.

Closes #27937 from Ngone51/udf_caseclass_support.

Authored-by: yi.wu <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
(cherry picked from commit f6ff7d0)
Signed-off-by: Wenchen Fan <[email protected]>
@Ngone51
Copy link
Member Author

Ngone51 commented Mar 25, 2020

thanks!

sjincho pushed a commit to sjincho/spark that referenced this pull request Apr 15, 2020
### What changes were proposed in this pull request?

To support  case class parameter for typed Scala UDF, e.g.

```
case class TestData(key: Int, value: String)
val f = (d: TestData) => d.key * d.value.toInt
val myUdf = udf(f)
val df = Seq(("data", TestData(50, "2"))).toDF("col1", "col2")
checkAnswer(df.select(myUdf(Column("col2"))), Row(100) :: Nil)
```

### Why are the changes needed?

Currently, Spark UDF can only work on data types like java.lang.String, o.a.s.sql.Row, Seq[_], etc. This is inconvenient if user want to apply an operation on one column, and the column is struct type. You must access data from a Row object, instead of domain object like Dataset operations. It will be great if UDF can work on types that are supported by Dataset, e.g. case class.

And here's benchmark result of using case class comparing to row:

```scala

// case class:  58ms 65ms 59ms 64ms 61ms
// row:         59ms 64ms 73ms 84ms 69ms
val f1 = (d: TestData) => s"${d.key}, ${d.value}"
val f2 = (r: Row) => s"${r.getInt(0)}, ${r.getString(1)}"
val udf1 = udf(f1)
// set spark.sql.legacy.allowUntypedScalaUDF=true
val udf2 = udf(f2, StringType)

val df = spark.range(100000).selectExpr("cast (id as int) as id")
    .select(struct('id, lit("str")).as("col"))
df.cache().collect()

// warmup to exclude some extra influence
df.select(udf1('col)).write.mode(SaveMode.Overwrite).format("noop").save()
df.select(udf2('col)).write.mode(SaveMode.Overwrite).format("noop").save()

start = System.currentTimeMillis()
df.select(udf1('col)).write.mode(SaveMode.Overwrite).format("noop").save()
println(System.currentTimeMillis() - start)

start = System.currentTimeMillis()
df.select(udf2('col)).write.mode(SaveMode.Overwrite).format("noop").save()
println(System.currentTimeMillis() - start)

```

### Does this PR introduce any user-facing change?

Yes. User now could be able to use typed Scala UDF with case class as input parameter.

### How was this patch tested?

Added unit tests.

Closes apache#27937 from Ngone51/udf_caseclass_support.

Authored-by: yi.wu <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
private[spark] case class SparkUserDefinedFunction(
f: AnyRef,
dataType: DataType,
inputSchemas: Seq[Option[ScalaReflection.Schema]],
Copy link
Contributor

@koertkuipers koertkuipers May 6, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

quite a lot of outside libraries use this constructor (or to be precise the older equivalent version with DataType instead of Schema that used to be in UserDefinedFunction). see for example:
https://github.com/salesforce/TransmogrifAI/blob/master/features/src/main/scala/com/salesforce/op/features/FeatureSparkTypes.scala#L269

can you suggest a workaround? i see no easy way to go from DataType to ExpressionEncoder...

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @koertkuipers , the example seems use UserDefinedFunction instead of SparkUserDefinedFunction. And since Spark v3.0, UserDefinedFunction has been changed from case class to abstract class, see SPARK-26216.

cc @cloud-fan

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hi @Ngone51 yes changing it to use SparkUserDefinedFunction instead of UserDefinedFunction was easy. but the change from inputSchemas to inputEncoders poses a bit of a challenge.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not go from DataType to ExpressionEncoder, which is impossible. It's from T <: FeatureType : TypeTag to ExpressionEncoder[T]. I think you need to adjust FeatureSparkTypes and support creating ExpressionEncoder. This is the cost of relying on internal APIs.

Copy link
Contributor

@koertkuipers koertkuipers May 10, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

while its nice to see case classes supported, i was surprised to find that another feature of Encoder is not supported: Option to indicate a udf argument could be null. for example one could define a udf for (Option[String], Int) => String to explicitly handle the case where the first input argument is null (or should i say its now None).
a better example is (String, Option[Int]) => String to work around the udf behavior that if primitivess are null the udf is not called and the output is also null.

especially when designing generic systems that use udfs this becomes really important. say you write something that does transform (X, Y) => X for generic types X and Y (with typetags). now say Y could be null (the udf could be called after a left join for example where Y could be joined in). the behavior would now change based on the concrete type of Y... for Strings nulls would get passed in to the udf while for Ints the udf would be skipped. i dont think anyone would want to deal with that. so instead you wants to write a udf for (X, Option[Y]) => X in this case.

Copy link
Contributor

@koertkuipers koertkuipers May 10, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in my testing i also found the api now to be somewhat inconsistent/confusing. basically sometimes CatalystTypeConverters.createToScalaConverter is used and sometimes ExpressionEncoder.fromRow, depending solely on if the the argument is a top level struct or not. but CatalystTypeConverters.createToScalaConverter and ExpressionEncoder.fromRow behave very differently, leading to inconsistent application.

for example this (contrived) usage works:

case class Person(name: String, age: Option[Int])
Seq((1, Person("john", Some(55))), (2, Person("mary", None))).toDF("id", "person").withColumn("age", udf{ p: Person1 => p.age }.apply(col("person")))

but this does not:

Seq((1, Seq(Person("john", Some(55)))), (2, Seq(Person("mary", None)))).toDF("id", "persons").withColumn("ages", udf{ s: Seq[Person1] => s.map(_.age) }.apply(col("persons")))

and while Option works nicely in Person case class (and also in tuples) Option does not work in a simple Seq:

Seq(Seq(Some(1), None)).toDF.withColumn("value", udf{ s: Seq[Option[Int]] => s.map(_.map(_ + 1)) }.apply(col("value")) )

and Option also does not work for a function argument:

Seq(None, Some(1), None).toDF.withColumn("value", udf{ o: Option[Int] => o.map(_ + 1) }.apply(col("value")))

this inconsistency will be hard to understand. and this inconsistency is not limited to Options. it also applies to many other things. for example tuples inside maps will not work (still have to use Row there) but tuples inside maps will work if its inside a case class. that is hard to explain to a user...

finally let me give some background why i am a little nervous about this change...
spark udfs have been somewhat limited for a long time. no support for case class, tuples, options. so many libraries have worked around that by defining their own udfs on top on SparkUserDefinedFunction. we do this inhouse too. it is easy to do this with type classes thanks to the composability of inputSchemas.
so now you replaced inputSchemas with inputEncoders. but ExpressionEncoder and TypeTags are not composable. i do not see a way for us to build on top of this for our own inhouse udfs. so then the alternative for us is to abandon our inhouse udfs and start using spark's udfs again, which now support case classes and tuples, which is nice! but the inconsistency of the api and lack of support for option makes that currently not viable to me. i realize this is a spark internal api and this is entirely my own problem. but i thought it was worth pointing out because i suspect i am not the only one that has done this. i think this is one of the more typical workarounds people have done using spark (and i am aware of multiple implementations of this workaround).

sorry for the long posts(s)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

inputSchemas is composable but it contains too little information, that's why the Spark UDF was so limited before.

Our goal is to make Spark UDF powerful enough so that people don't need to use internal APIs to build inhouse UDFs. But you are right that the support is not completed. @Ngone51 Can you take a closer look and see how to make it completed?

BTW, if you do need to keep your inhouse UDFs for a while, there is a way to create ExpressionEncoder from Seq[DataType], by calling RowEncoder.apply. It only supports standard Spark external types, i.e. Row, not case class, which is the same as older versions of Spark.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Ngone51 Can you take a closer look and see how to make it completed?

Yea, I'm looking into it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan thanks i will take a look at RowEncoder

cloud-fan pushed a commit that referenced this pull request Jun 19, 2020
…a UDF

### What changes were proposed in this pull request?

This PR adds support for typed Scala UDF to accept composed type of case class, e.g. Seq[T], Array[T], Map[Int, T] (assuming T is case class type), as input parameter type.

### Why are the changes needed?

After #27937, typed Scala UDF now has supported case class as its input parameter type. However, it can not accept the composed type of case class, such as Seq[T], Array[T], Map[Int, T] (assuming T is case class type), which causing confuse(e.g. #27937 (comment)) to the user.

### Does this PR introduce _any_ user-facing change?

Yes.

Run the query:

```
scala> case class Person(name: String, age: Int)
scala> Seq((1, Seq(Person("Jack", 5)))).toDF("id", "persons").withColumn("ages", udf{ s: Seq[Person] => s.head.age }.apply(col("persons"))).show

```

Before:

```

org.apache.spark.SparkException: Failed to execute user defined function($read$$Lambda$2861/628175152: (array<struct<name:string,age:int>>) => int)
  at org.apache.spark.sql.catalyst.expressions.ScalaUDF.eval(ScalaUDF.scala:1129)
  at org.apache.spark.sql.catalyst.expressions.Alias.eval(namedExpressions.scala:156)
  at org.apache.spark.sql.catalyst.expressions.InterpretedMutableProjection.apply(InterpretedMutableProjection.scala:83)
  at org.apache.spark.sql.catalyst.optimizer.ConvertToLocalRelation$$anonfun$apply$17.$anonfun$applyOrElse$69(Optimizer.scala:1492)
  at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238)

....

Caused by: java.lang.ClassCastException: org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema cannot be cast to Person
  at $anonfun$res3$1(<console>:30)
  at $anonfun$res3$1$adapted(<console>:30)
  at org.apache.spark.sql.catalyst.expressions.ScalaUDF.$anonfun$f$2(ScalaUDF.scala:156)
  at org.apache.spark.sql.catalyst.expressions.ScalaUDF.eval(ScalaUDF.scala:1126)
  ... 142 more
```

After:
```
+---+-----------+----+
| id|    persons|ages|
+---+-----------+----+
|  1|[[Jack, 5]]| [5]|
+---+-----------+----+
```

### How was this patch tested?

Added tests.

Closes #28645 from Ngone51/impr-udf.

Authored-by: yi.wu <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants