From 6c0971fe68a429bf8dc24e21c16e3d0e137a1a0a Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Thu, 29 Nov 2018 21:38:51 +0800 Subject: [PATCH 1/3] Do not use case class as public API (UserDefinedFunction) --- project/MimaExcludes.scala | 6 +- .../sql/expressions/UserDefinedFunction.scala | 121 +++++++++--------- .../org/apache/spark/sql/functions.scala | 2 +- 3 files changed, 63 insertions(+), 66 deletions(-) diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala index fcef424c330f..1c83cf5860c5 100644 --- a/project/MimaExcludes.scala +++ b/project/MimaExcludes.scala @@ -227,7 +227,11 @@ object MimaExcludes { case ReversedMissingMethodProblem(meth) => !meth.owner.fullName.startsWith("org.apache.spark.sql.sources.v2") case _ => true - } + }, + + // [SPARK-26216][SQL] Do not use case class as public API (UserDefinedFunction) + ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.expressions.UserDefinedFunction$"), + ProblemFilters.exclude[IncompatibleTemplateDefProblem]("org.apache.spark.sql.expressions.UserDefinedFunction") ) // Exclude rules for 2.4.x diff --git a/sql/core/src/main/scala/org/apache/spark/sql/expressions/UserDefinedFunction.scala b/sql/core/src/main/scala/org/apache/spark/sql/expressions/UserDefinedFunction.scala index 58a942afe28c..fa2958c3e5b9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/expressions/UserDefinedFunction.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/expressions/UserDefinedFunction.scala @@ -38,25 +38,14 @@ import org.apache.spark.sql.types.DataType * @since 1.3.0 */ @Stable -case class UserDefinedFunction protected[sql] ( - f: AnyRef, - dataType: DataType, - inputTypes: Option[Seq[DataType]]) { - - private var _nameOption: Option[String] = None - private var _nullable: Boolean = true - private var _deterministic: Boolean = true - - // This is a `var` instead of in the constructor for backward compatibility of this case class. - // TODO: revisit this case class in Spark 3.0, and narrow down the public surface. - private[sql] var nullableTypes: Option[Seq[Boolean]] = None +trait UserDefinedFunction { /** * Returns true when the UDF can return a nullable value. * * @since 2.3.0 */ - def nullable: Boolean = _nullable + def nullable: Boolean /** * Returns true iff the UDF is deterministic, i.e. the UDF produces the same output given the same @@ -64,7 +53,7 @@ case class UserDefinedFunction protected[sql] ( * * @since 2.3.0 */ - def deterministic: Boolean = _deterministic + def deterministic: Boolean /** * Returns an expression that invokes the UDF, using the given arguments. @@ -72,80 +61,85 @@ case class UserDefinedFunction protected[sql] ( * @since 1.3.0 */ @scala.annotation.varargs - def apply(exprs: Column*): Column = { + def apply(exprs: Column*): Column + + /** + * Updates UserDefinedFunction with a given name. + * + * @since 2.3.0 + */ + def withName(name: String): UserDefinedFunction + + /** + * Updates UserDefinedFunction to non-nullable. + * + * @since 2.3.0 + */ + def asNonNullable(): UserDefinedFunction + + /** + * Updates UserDefinedFunction to nondeterministic. + * + * @since 2.3.0 + */ + def asNondeterministic(): UserDefinedFunction +} + +private[sql] case class SparkUserDefinedFunction( + f: AnyRef, + dataType: DataType, + inputTypes: Option[Seq[DataType]], + nullableTypes: Option[Seq[Boolean]], + name: Option[String] = None, + nullable: Boolean = true, + deterministic: Boolean = true) extends UserDefinedFunction { + + @scala.annotation.varargs + override def apply(exprs: Column*): Column = { // TODO: make sure this class is only instantiated through `SparkUserDefinedFunction.create()` // and `nullableTypes` is always set. - if (nullableTypes.isEmpty) { - nullableTypes = Some(ScalaReflection.getParameterTypeNullability(f)) - } if (inputTypes.isDefined) { assert(inputTypes.get.length == nullableTypes.get.length) } + val inputsNullSafe = if (nullableTypes.isEmpty) { + ScalaReflection.getParameterTypeNullability(f) + } else { + nullableTypes.get + } + Column(ScalaUDF( f, dataType, exprs.map(_.expr), - nullableTypes.get, + inputsNullSafe, inputTypes.getOrElse(Nil), - udfName = _nameOption, - nullable = _nullable, - udfDeterministic = _deterministic)) - } - - private def copyAll(): UserDefinedFunction = { - val udf = copy() - udf._nameOption = _nameOption - udf._nullable = _nullable - udf._deterministic = _deterministic - udf.nullableTypes = nullableTypes - udf + udfName = name, + nullable = nullable, + udfDeterministic = deterministic)) } - /** - * Updates UserDefinedFunction with a given name. - * - * @since 2.3.0 - */ - def withName(name: String): UserDefinedFunction = { - val udf = copyAll() - udf._nameOption = Option(name) - udf + override def withName(name: String): UserDefinedFunction = { + copy(name = Option(name)) } - /** - * Updates UserDefinedFunction to non-nullable. - * - * @since 2.3.0 - */ - def asNonNullable(): UserDefinedFunction = { + override def asNonNullable(): UserDefinedFunction = { if (!nullable) { this } else { - val udf = copyAll() - udf._nullable = false - udf + copy(nullable = false) } } - /** - * Updates UserDefinedFunction to nondeterministic. - * - * @since 2.3.0 - */ - def asNondeterministic(): UserDefinedFunction = { - if (!_deterministic) { + override def asNondeterministic(): UserDefinedFunction = { + if (!deterministic) { this } else { - val udf = copyAll() - udf._deterministic = false - udf + copy(deterministic = false) } } } -// We have to use a name different than `UserDefinedFunction` here, to avoid breaking the binary -// compatibility of the auto-generate UserDefinedFunction object. private[sql] object SparkUserDefinedFunction { def create( @@ -157,8 +151,7 @@ private[sql] object SparkUserDefinedFunction { } else { Some(inputSchemas.map(_.get.dataType)) } - val udf = new UserDefinedFunction(f, dataType, inputTypes) - udf.nullableTypes = Some(inputSchemas.map(_.map(_.nullable).getOrElse(true))) - udf + val nullableTypes = Some(inputSchemas.map(_.map(_.nullable).getOrElse(true))) + SparkUserDefinedFunction(f, dataType, inputTypes, nullableTypes) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala index efa8f8526387..33186f778d86 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala @@ -4259,7 +4259,7 @@ object functions { def udf(f: AnyRef, dataType: DataType): UserDefinedFunction = { // TODO: should call SparkUserDefinedFunction.create() instead but inputSchemas is currently // unavailable. We may need to create type-safe overloaded versions of udf() methods. - new UserDefinedFunction(f, dataType, inputTypes = None) + SparkUserDefinedFunction(f, dataType, inputTypes = None, nullableTypes = None) } /** From 10138f5f2f9799b9963e9acb0eca1a98376b6539 Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Thu, 29 Nov 2018 23:08:05 +0800 Subject: [PATCH 2/3] address comments --- .../apache/spark/sql/expressions/UserDefinedFunction.scala | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/expressions/UserDefinedFunction.scala b/sql/core/src/main/scala/org/apache/spark/sql/expressions/UserDefinedFunction.scala index fa2958c3e5b9..f88e0e0f299d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/expressions/UserDefinedFunction.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/expressions/UserDefinedFunction.scala @@ -38,7 +38,7 @@ import org.apache.spark.sql.types.DataType * @since 1.3.0 */ @Stable -trait UserDefinedFunction { +sealed trait UserDefinedFunction { /** * Returns true when the UDF can return a nullable value. @@ -102,10 +102,8 @@ private[sql] case class SparkUserDefinedFunction( assert(inputTypes.get.length == nullableTypes.get.length) } - val inputsNullSafe = if (nullableTypes.isEmpty) { + val inputsNullSafe = nullableTypes.getOrElse { ScalaReflection.getParameterTypeNullability(f) - } else { - nullableTypes.get } Column(ScalaUDF( From ad6605e5f049cc21ed5754b754b695bcac6d4502 Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Fri, 30 Nov 2018 16:20:30 +0800 Subject: [PATCH 3/3] add migration guide --- docs/sql-migration-guide-upgrade.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/docs/sql-migration-guide-upgrade.md b/docs/sql-migration-guide-upgrade.md index e48125a0972b..787f4bcbbea8 100644 --- a/docs/sql-migration-guide-upgrade.md +++ b/docs/sql-migration-guide-upgrade.md @@ -31,6 +31,8 @@ displayTitle: Spark SQL Upgrading Guide - In Spark version 2.4 and earlier, the `SET` command works without any warnings even if the specified key is for `SparkConf` entries and it has no effect because the command does not update `SparkConf`, but the behavior might confuse users. Since 3.0, the command fails if a `SparkConf` key is used. You can disable such a check by setting `spark.sql.legacy.execution.setCommandRejectsSparkConfs` to `false`. + - Spark applications which are built with Spark version 2.4 and prior, and call methods of `UserDefinedFunction`, need to be re-compiled with Spark 3.0, as they are not binary compatible with Spark 3.0. + ## Upgrading From Spark SQL 2.3 to 2.4 - In Spark version 2.3 and earlier, the second parameter to array_contains function is implicitly promoted to the element type of first array type parameter. This type promotion can be lossy and may cause `array_contains` function to return wrong result. This problem has been addressed in 2.4 by employing a safer type promotion mechanism. This can cause some change in behavior and are illustrated in the table below.