Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions docs/sql-migration-guide-upgrade.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ displayTitle: Spark SQL Upgrading Guide

- In Spark version 2.4 and earlier, the `SET` command works without any warnings even if the specified key is for `SparkConf` entries and it has no effect because the command does not update `SparkConf`, but the behavior might confuse users. Since 3.0, the command fails if a `SparkConf` key is used. You can disable such a check by setting `spark.sql.legacy.execution.setCommandRejectsSparkConfs` to `false`.

- Spark applications which are built with Spark version 2.4 and prior, and call methods of `UserDefinedFunction`, need to be re-compiled with Spark 3.0, as they are not binary compatible with Spark 3.0.

## Upgrading From Spark SQL 2.3 to 2.4

- In Spark version 2.3 and earlier, the second parameter to array_contains function is implicitly promoted to the element type of first array type parameter. This type promotion can be lossy and may cause `array_contains` function to return wrong result. This problem has been addressed in 2.4 by employing a safer type promotion mechanism. This can cause some change in behavior and are illustrated in the table below.
Expand Down
6 changes: 5 additions & 1 deletion project/MimaExcludes.scala
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,11 @@ object MimaExcludes {
case ReversedMissingMethodProblem(meth) =>
!meth.owner.fullName.startsWith("org.apache.spark.sql.sources.v2")
case _ => true
}
},

// [SPARK-26216][SQL] Do not use case class as public API (UserDefinedFunction)
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.expressions.UserDefinedFunction$"),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we get rid of this in #23351?

ProblemFilters.exclude[IncompatibleTemplateDefProblem]("org.apache.spark.sql.expressions.UserDefinedFunction")
)

// Exclude rules for 2.4.x
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,114 +38,106 @@ import org.apache.spark.sql.types.DataType
* @since 1.3.0
*/
@Stable
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm +1 for this PR, but I'm just wondering if this @Stable tag with @since 1.3.0 tag is valid or not here.
Previous case class was stable until 2.4.x and new trait will be stable since 3.0. But, the stability is broken at 3.0.0 once. Did I understand correctly?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it better to change it to @Stable with @since 3.0.0?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yea actually I was wondering about the same thing.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd go ahead and leave the Since version. The API is essentially unchanged, though there are some marginal breaking compile time changes. But same is true of many things we are changing in 3.0. I've tagged the JIRA with release-notes and will add a blurb about the change.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not a new API anyway, it will be weird to change since to 3.0.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it. Thank you, @HyukjinKwon , @srowen , @cloud-fan .

case class UserDefinedFunction protected[sql] (
f: AnyRef,
dataType: DataType,
inputTypes: Option[Seq[DataType]]) {

private var _nameOption: Option[String] = None
private var _nullable: Boolean = true
private var _deterministic: Boolean = true

// This is a `var` instead of in the constructor for backward compatibility of this case class.
// TODO: revisit this case class in Spark 3.0, and narrow down the public surface.
private[sql] var nullableTypes: Option[Seq[Boolean]] = None
sealed trait UserDefinedFunction {

/**
* Returns true when the UDF can return a nullable value.
*
* @since 2.3.0
*/
def nullable: Boolean = _nullable
def nullable: Boolean

/**
* Returns true iff the UDF is deterministic, i.e. the UDF produces the same output given the same
* input.
*
* @since 2.3.0
*/
def deterministic: Boolean = _deterministic
def deterministic: Boolean

/**
* Returns an expression that invokes the UDF, using the given arguments.
*
* @since 1.3.0
*/
@scala.annotation.varargs
def apply(exprs: Column*): Column = {
def apply(exprs: Column*): Column

/**
* Updates UserDefinedFunction with a given name.
*
* @since 2.3.0
*/
def withName(name: String): UserDefinedFunction

/**
* Updates UserDefinedFunction to non-nullable.
*
* @since 2.3.0
*/
def asNonNullable(): UserDefinedFunction

/**
* Updates UserDefinedFunction to nondeterministic.
*
* @since 2.3.0
*/
def asNondeterministic(): UserDefinedFunction
}

private[sql] case class SparkUserDefinedFunction(
f: AnyRef,
dataType: DataType,
inputTypes: Option[Seq[DataType]],
nullableTypes: Option[Seq[Boolean]],
name: Option[String] = None,
nullable: Boolean = true,
deterministic: Boolean = true) extends UserDefinedFunction {

@scala.annotation.varargs
override def apply(exprs: Column*): Column = {
// TODO: make sure this class is only instantiated through `SparkUserDefinedFunction.create()`
// and `nullableTypes` is always set.
if (nullableTypes.isEmpty) {
nullableTypes = Some(ScalaReflection.getParameterTypeNullability(f))
}
if (inputTypes.isDefined) {
assert(inputTypes.get.length == nullableTypes.get.length)
}

val inputsNullSafe = nullableTypes.getOrElse {
ScalaReflection.getParameterTypeNullability(f)
}

Column(ScalaUDF(
f,
dataType,
exprs.map(_.expr),
nullableTypes.get,
inputsNullSafe,
inputTypes.getOrElse(Nil),
udfName = _nameOption,
nullable = _nullable,
udfDeterministic = _deterministic))
}

private def copyAll(): UserDefinedFunction = {
val udf = copy()
udf._nameOption = _nameOption
udf._nullable = _nullable
udf._deterministic = _deterministic
udf.nullableTypes = nullableTypes
udf
udfName = name,
nullable = nullable,
udfDeterministic = deterministic))
}

/**
* Updates UserDefinedFunction with a given name.
*
* @since 2.3.0
*/
def withName(name: String): UserDefinedFunction = {
val udf = copyAll()
udf._nameOption = Option(name)
udf
override def withName(name: String): UserDefinedFunction = {
copy(name = Option(name))
}

/**
* Updates UserDefinedFunction to non-nullable.
*
* @since 2.3.0
*/
def asNonNullable(): UserDefinedFunction = {
override def asNonNullable(): UserDefinedFunction = {
if (!nullable) {
this
} else {
val udf = copyAll()
udf._nullable = false
udf
copy(nullable = false)
}
}

/**
* Updates UserDefinedFunction to nondeterministic.
*
* @since 2.3.0
*/
def asNondeterministic(): UserDefinedFunction = {
if (!_deterministic) {
override def asNondeterministic(): UserDefinedFunction = {
if (!deterministic) {
this
} else {
val udf = copyAll()
udf._deterministic = false
udf
copy(deterministic = false)
}
}
}

// We have to use a name different than `UserDefinedFunction` here, to avoid breaking the binary
// compatibility of the auto-generate UserDefinedFunction object.
private[sql] object SparkUserDefinedFunction {

def create(
Expand All @@ -157,8 +149,7 @@ private[sql] object SparkUserDefinedFunction {
} else {
Some(inputSchemas.map(_.get.dataType))
}
val udf = new UserDefinedFunction(f, dataType, inputTypes)
udf.nullableTypes = Some(inputSchemas.map(_.map(_.nullable).getOrElse(true)))
udf
val nullableTypes = Some(inputSchemas.map(_.map(_.nullable).getOrElse(true)))
SparkUserDefinedFunction(f, dataType, inputTypes, nullableTypes)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4259,7 +4259,7 @@ object functions {
def udf(f: AnyRef, dataType: DataType): UserDefinedFunction = {
// TODO: should call SparkUserDefinedFunction.create() instead but inputSchemas is currently
// unavailable. We may need to create type-safe overloaded versions of udf() methods.
new UserDefinedFunction(f, dataType, inputTypes = None)
SparkUserDefinedFunction(f, dataType, inputTypes = None, nullableTypes = None)
}

/**
Expand Down