Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
Do not use case class as public API (UserDefinedFunction)
  • Loading branch information
cloud-fan committed Nov 30, 2018
commit 6c0971fe68a429bf8dc24e21c16e3d0e137a1a0a
6 changes: 5 additions & 1 deletion project/MimaExcludes.scala
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,11 @@ object MimaExcludes {
case ReversedMissingMethodProblem(meth) =>
!meth.owner.fullName.startsWith("org.apache.spark.sql.sources.v2")
case _ => true
}
},

// [SPARK-26216][SQL] Do not use case class as public API (UserDefinedFunction)
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.expressions.UserDefinedFunction$"),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we get rid of this in #23351?

ProblemFilters.exclude[IncompatibleTemplateDefProblem]("org.apache.spark.sql.expressions.UserDefinedFunction")
)

// Exclude rules for 2.4.x
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,114 +38,108 @@ import org.apache.spark.sql.types.DataType
* @since 1.3.0
*/
@Stable
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm +1 for this PR, but I'm just wondering if this @Stable tag with @since 1.3.0 tag is valid or not here.
Previous case class was stable until 2.4.x and new trait will be stable since 3.0. But, the stability is broken at 3.0.0 once. Did I understand correctly?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it better to change it to @Stable with @since 3.0.0?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yea actually I was wondering about the same thing.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd go ahead and leave the Since version. The API is essentially unchanged, though there are some marginal breaking compile time changes. But same is true of many things we are changing in 3.0. I've tagged the JIRA with release-notes and will add a blurb about the change.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not a new API anyway, it will be weird to change since to 3.0.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it. Thank you, @HyukjinKwon , @srowen , @cloud-fan .

case class UserDefinedFunction protected[sql] (
f: AnyRef,
dataType: DataType,
inputTypes: Option[Seq[DataType]]) {

private var _nameOption: Option[String] = None
private var _nullable: Boolean = true
private var _deterministic: Boolean = true

// This is a `var` instead of in the constructor for backward compatibility of this case class.
// TODO: revisit this case class in Spark 3.0, and narrow down the public surface.
private[sql] var nullableTypes: Option[Seq[Boolean]] = None
trait UserDefinedFunction {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we make this sealed? I'm not sure. Would any user ever extend this meaningfully? I kind of worry someone will start doing so; maybe they already subclass it in some cases though. Elsewhere it might help the compiler understand in match statements that there is only ever one type of UDF class to match on.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good idea! though I'm not sure if sealed works for Java.


/**
* Returns true when the UDF can return a nullable value.
*
* @since 2.3.0
*/
def nullable: Boolean = _nullable
def nullable: Boolean

/**
* Returns true iff the UDF is deterministic, i.e. the UDF produces the same output given the same
* input.
*
* @since 2.3.0
*/
def deterministic: Boolean = _deterministic
def deterministic: Boolean

/**
* Returns an expression that invokes the UDF, using the given arguments.
*
* @since 1.3.0
*/
@scala.annotation.varargs
def apply(exprs: Column*): Column = {
def apply(exprs: Column*): Column

/**
* Updates UserDefinedFunction with a given name.
*
* @since 2.3.0
*/
def withName(name: String): UserDefinedFunction

/**
* Updates UserDefinedFunction to non-nullable.
*
* @since 2.3.0
*/
def asNonNullable(): UserDefinedFunction

/**
* Updates UserDefinedFunction to nondeterministic.
*
* @since 2.3.0
*/
def asNondeterministic(): UserDefinedFunction
}

private[sql] case class SparkUserDefinedFunction(
f: AnyRef,
dataType: DataType,
inputTypes: Option[Seq[DataType]],
nullableTypes: Option[Seq[Boolean]],
name: Option[String] = None,
nullable: Boolean = true,
deterministic: Boolean = true) extends UserDefinedFunction {

@scala.annotation.varargs
override def apply(exprs: Column*): Column = {
// TODO: make sure this class is only instantiated through `SparkUserDefinedFunction.create()`
// and `nullableTypes` is always set.
if (nullableTypes.isEmpty) {
nullableTypes = Some(ScalaReflection.getParameterTypeNullability(f))
}
if (inputTypes.isDefined) {
assert(inputTypes.get.length == nullableTypes.get.length)
}

val inputsNullSafe = if (nullableTypes.isEmpty) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can use getOrElse here and even inline this into the call below, but I don't really care.

ScalaReflection.getParameterTypeNullability(f)
} else {
nullableTypes.get
}

Column(ScalaUDF(
f,
dataType,
exprs.map(_.expr),
nullableTypes.get,
inputsNullSafe,
inputTypes.getOrElse(Nil),
udfName = _nameOption,
nullable = _nullable,
udfDeterministic = _deterministic))
}

private def copyAll(): UserDefinedFunction = {
val udf = copy()
udf._nameOption = _nameOption
udf._nullable = _nullable
udf._deterministic = _deterministic
udf.nullableTypes = nullableTypes
udf
udfName = name,
nullable = nullable,
udfDeterministic = deterministic))
}

/**
* Updates UserDefinedFunction with a given name.
*
* @since 2.3.0
*/
def withName(name: String): UserDefinedFunction = {
val udf = copyAll()
udf._nameOption = Option(name)
udf
override def withName(name: String): UserDefinedFunction = {
copy(name = Option(name))
}

/**
* Updates UserDefinedFunction to non-nullable.
*
* @since 2.3.0
*/
def asNonNullable(): UserDefinedFunction = {
override def asNonNullable(): UserDefinedFunction = {
if (!nullable) {
this
} else {
val udf = copyAll()
udf._nullable = false
udf
copy(nullable = false)
}
}

/**
* Updates UserDefinedFunction to nondeterministic.
*
* @since 2.3.0
*/
def asNondeterministic(): UserDefinedFunction = {
if (!_deterministic) {
override def asNondeterministic(): UserDefinedFunction = {
if (!deterministic) {
this
} else {
val udf = copyAll()
udf._deterministic = false
udf
copy(deterministic = false)
}
}
}

// We have to use a name different than `UserDefinedFunction` here, to avoid breaking the binary
// compatibility of the auto-generate UserDefinedFunction object.
private[sql] object SparkUserDefinedFunction {

def create(
Expand All @@ -157,8 +151,7 @@ private[sql] object SparkUserDefinedFunction {
} else {
Some(inputSchemas.map(_.get.dataType))
}
val udf = new UserDefinedFunction(f, dataType, inputTypes)
udf.nullableTypes = Some(inputSchemas.map(_.map(_.nullable).getOrElse(true)))
udf
val nullableTypes = Some(inputSchemas.map(_.map(_.nullable).getOrElse(true)))
SparkUserDefinedFunction(f, dataType, inputTypes, nullableTypes)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4259,7 +4259,7 @@ object functions {
def udf(f: AnyRef, dataType: DataType): UserDefinedFunction = {
// TODO: should call SparkUserDefinedFunction.create() instead but inputSchemas is currently
// unavailable. We may need to create type-safe overloaded versions of udf() methods.
new UserDefinedFunction(f, dataType, inputTypes = None)
SparkUserDefinedFunction(f, dataType, inputTypes = None, nullableTypes = None)
}

/**
Expand Down