Skip to content
Closed
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
add comments
  • Loading branch information
Davies Liu committed Mar 29, 2016
commit c57e8a4c520e95c5a5ce3c69849818ef9561fca2
Original file line number Diff line number Diff line change
Expand Up @@ -26,25 +26,30 @@ import org.apache.spark.sql.catalyst.rules.Rule
* Extracts PythonUDFs from operators, rewriting the query plan so that the UDF can be evaluated
* alone in a batch.
*
* Only extracts the PythonUDFs that could be evaluated in Python (the single child is PythonUDFs
* or all the children could be evaluated in JVM).
*
* This has the limitation that the input to the Python UDF is not allowed include attributes from
* multiple child operators.
*/
private[spark] object ExtractPythonUDFs extends Rule[LogicalPlan] {

private def hasUDF(e: Expression): Boolean = {
private def hasPythonUDF(e: Expression): Boolean = {
e.find(_.isInstanceOf[PythonUDF]).isDefined
}

private def canEvaluate(e: PythonUDF): Boolean = {
private def canEvaluateInPython(e: PythonUDF): Boolean = {
e.children match {
case Seq(u: PythonUDF) => canEvaluate(u)
case children => !children.exists(hasUDF)
// single PythonUDF child could be chained and evaluated in Python
case Seq(u: PythonUDF) => canEvaluateInPython(u)
// Python UDF can't be evaluated directly in JVM
case children => !children.exists(hasPythonUDF)
}
}

private def collectEvaluatableUDF(expr: Expression): Seq[PythonUDF] = {
expr.collect {
case udf: PythonUDF if canEvaluate(udf) => udf
case udf: PythonUDF if canEvaluateInPython(udf) => udf
}
}

Expand Down