Skip to content
Closed
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Merge remote-tracking branch 'upstream/master' into testMaster22
  • Loading branch information
gatorsmile committed Jul 25, 2018
commit 328adddc0c1870400e92934827150df2c98731f6
33 changes: 33 additions & 0 deletions sql/core/src/test/scala/org/apache/spark/sql/UDFSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,14 @@ package org.apache.spark.sql

import org.apache.spark.sql.api.java._
import org.apache.spark.sql.catalyst.plans.logical.Project
<<<<<<< HEAD
import org.apache.spark.sql.execution.QueryExecution
import org.apache.spark.sql.execution.columnar.InMemoryRelation
import org.apache.spark.sql.execution.command.{CreateDataSourceTableAsSelectCommand, ExplainCommand}
import org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand
=======
import org.apache.spark.sql.execution.command.ExplainCommand
>>>>>>> upstream/master
import org.apache.spark.sql.functions.{lit, udf}
import org.apache.spark.sql.test.SharedSQLContext
import org.apache.spark.sql.test.SQLTestData._
Expand Down Expand Up @@ -364,4 +368,33 @@ class UDFSuite extends QueryTest with SharedSQLContext {
}
}
}

test("SPARK-24891 Fix HandleNullInputsForUDF rule") {
val udf1 = udf({(x: Int, y: Int) => x + y})
val df = spark.range(0, 3).toDF("a")
.withColumn("b", udf1($"a", udf1($"a", lit(10))))
.withColumn("c", udf1($"a", lit(null)))
val plan = spark.sessionState.executePlan(df.logicalPlan).analyzed

comparePlans(df.logicalPlan, plan)
checkAnswer(
df,
Seq(
Row(0, 10, null),
Row(1, 12, null),
Row(2, 14, null)))
}

test("SPARK-24891 Fix HandleNullInputsForUDF rule - with table") {
withTable("x") {
Seq((1, "2"), (2, "4")).toDF("a", "b").write.format("json").saveAsTable("x")
sql("insert into table x values(3, null)")
sql("insert into table x values(null, '4')")
spark.udf.register("f", (a: Int, b: String) => a + b)
val df = spark.sql("SELECT f(a, b) FROM x")
val plan = spark.sessionState.executePlan(df.logicalPlan).analyzed
comparePlans(df.logicalPlan, plan)
checkAnswer(df, Seq(Row("12"), Row("24"), Row("3null"), Row(null)))
}
}
}
You are viewing a condensed version of this merge commit. You can view the full changes here.