From ad6df5ce4aade843966efa723d3ad1ffcbeb8541 Mon Sep 17 00:00:00 2001 From: Wang Shuo Date: Mon, 23 Sep 2019 17:11:55 +0800 Subject: [PATCH 1/8] [SPARK-29213] Make it consistent when get notnull output and generate null checks in FilteExec --- .../execution/basicPhysicalOperators.scala | 2 +- .../sql/hive/execution/SQLQuerySuite.scala | 25 +++++++++++++++++++ 2 files changed, 26 insertions(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala index b072a7f5d914..65e5288a838b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala @@ -117,7 +117,7 @@ case class FilterExec(condition: Expression, child: SparkPlan) override def output: Seq[Attribute] = { child.output.map { a => - if (a.nullable && notNullAttributes.contains(a.exprId)) { + if (a.nullable && notNullPreds.exists(_.semanticEquals(a))) { a.withNullability(false) } else { a diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala index 1638f6cd9180..837858de321a 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala @@ -2412,4 +2412,29 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton { } } } + + test("SPARK-29213 Make it consistent when get notnull output and generate null " + + "checks in FilterExec") { + withTable("table1", "table2", "table3") { + sql("create table table1(x string)") + sql("create table table2(x bigint)") + sql("create table table3(x string)") + sql("insert into table2 select null as x") + sql( + """ + |select t1.x + |from ( + | select x from table1) t1 + |left join ( + | select x from ( + | select x from table2 + | union all + | select substr(x,5) x from table3 + | ) a + | where length(x)>0 + |) t3 + |on t1.x=t3.x + """.stripMargin).collect() + } + } } From 270172bb23688ff823bb7f93e3f26ca357f52568 Mon Sep 17 00:00:00 2001 From: Wang Shuo Date: Tue, 24 Sep 2019 11:53:11 +0800 Subject: [PATCH 2/8] remove exprId both in notNullPreds and otherPreds when get notNullAttributes --- .../apache/spark/sql/execution/basicPhysicalOperators.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala index 65e5288a838b..48b5928e5ad5 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala @@ -110,6 +110,7 @@ case class FilterExec(condition: Expression, child: SparkPlan) // The columns that will filtered out by `IsNotNull` could be considered as not nullable. private val notNullAttributes = notNullPreds.flatMap(_.references).distinct.map(_.exprId) + .diff(otherPreds.flatMap(_.references).distinct.map(_.exprId)) // Mark this as empty. We'll evaluate the input during doConsume(). We don't want to evaluate // all the variables at the beginning to take advantage of short circuiting. @@ -117,7 +118,7 @@ case class FilterExec(condition: Expression, child: SparkPlan) override def output: Seq[Attribute] = { child.output.map { a => - if (a.nullable && notNullPreds.exists(_.semanticEquals(a))) { + if (a.nullable && notNullAttributes.contains(a.exprId)) { a.withNullability(false) } else { a From 9ad9723b84eb874af9bbf8b89192c8cb2116f219 Mon Sep 17 00:00:00 2001 From: Wang Shuo Date: Tue, 24 Sep 2019 12:40:47 +0800 Subject: [PATCH 3/8] modify test table names --- .../spark/sql/hive/execution/SQLQuerySuite.scala | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala index 837858de321a..685e9e506ee2 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala @@ -2415,21 +2415,21 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton { test("SPARK-29213 Make it consistent when get notnull output and generate null " + "checks in FilterExec") { - withTable("table1", "table2", "table3") { - sql("create table table1(x string)") - sql("create table table2(x bigint)") - sql("create table table3(x string)") - sql("insert into table2 select null as x") + withTable("table_29213_1", "table_29213_2", "table_29213_3") { + sql("create table table_29213_1(x string)") + sql("create table table_29213_2(x bigint)") + sql("create table table_29213_3(x string)") + sql("insert into table_29213_2 select null as x") sql( """ |select t1.x |from ( - | select x from table1) t1 + | select x from table_29213_1) t1 |left join ( | select x from ( - | select x from table2 + | select x from table_29213_2 | union all - | select substr(x,5) x from table3 + | select substr(x,5) x from table_29213_3 | ) a | where length(x)>0 |) t3 From c03dd959eb3ed58c2a67a5b85d6d5b7c48e25464 Mon Sep 17 00:00:00 2001 From: Wang Shuo Date: Wed, 25 Sep 2019 15:43:52 +0800 Subject: [PATCH 4/8] Some slight modifications so we no longer depend on Hive. This is from https://github.com/JoshRosen/spark/commit/d1658bbffb07d2b0c5f3aabc9362b397a2c0aeb8 --- .../org/apache/spark/sql/SQLQuerySuite.scala | 27 +++++++++++++++++++ .../sql/hive/execution/SQLQuerySuite.scala | 25 ----------------- 2 files changed, 27 insertions(+), 25 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala index b5d021549c7a..eb78955c4da7 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala @@ -3233,6 +3233,33 @@ class SQLQuerySuite extends QueryTest with SharedSparkSession { } } } + + test("SPARK-29213 Make it consistent when get notnull output and generate null " + + "checks in FilterExec") { + withView("t1", "t2", "t3") { + sql("select ''").as[String].map(identity).toDF("x").createOrReplaceTempView("t1") + sql("select * from values 0, cast(null as bigint)") + .as[java.lang.Long] + .map(identity) + .toDF("x") + .createOrReplaceTempView("t2") + sql("select ''").as[String].map(identity).toDF("x").createOrReplaceTempView("t3") + sql( + """ + |select t1.x + |from t1 + |left join ( + | select x from ( + | select x from t2 + | union all + | select substr(x,5) x from t3 + | ) a + | where length(x)>0 + |) t3 + |on t1.x=t3.x + """.stripMargin).collect() + } + } } case class Foo(bar: Option[String]) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala index 685e9e506ee2..1638f6cd9180 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala @@ -2412,29 +2412,4 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton { } } } - - test("SPARK-29213 Make it consistent when get notnull output and generate null " + - "checks in FilterExec") { - withTable("table_29213_1", "table_29213_2", "table_29213_3") { - sql("create table table_29213_1(x string)") - sql("create table table_29213_2(x bigint)") - sql("create table table_29213_3(x string)") - sql("insert into table_29213_2 select null as x") - sql( - """ - |select t1.x - |from ( - | select x from table_29213_1) t1 - |left join ( - | select x from ( - | select x from table_29213_2 - | union all - | select substr(x,5) x from table_29213_3 - | ) a - | where length(x)>0 - |) t3 - |on t1.x=t3.x - """.stripMargin).collect() - } - } } From 30e2ac725c9d2acdace84e22701779bebb8d1de9 Mon Sep 17 00:00:00 2001 From: Wang Shuo Date: Wed, 25 Sep 2019 17:19:23 +0800 Subject: [PATCH 5/8] Generate extra IsNotNull predicate for null check --- .../spark/sql/execution/basicPhysicalOperators.scala | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala index 48b5928e5ad5..57161d70a8e2 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala @@ -19,6 +19,7 @@ package org.apache.spark.sql.execution import java.util.concurrent.TimeUnit._ +import scala.collection.mutable import scala.concurrent.{ExecutionContext, Future} import scala.concurrent.duration.Duration @@ -110,7 +111,6 @@ case class FilterExec(condition: Expression, child: SparkPlan) // The columns that will filtered out by `IsNotNull` could be considered as not nullable. private val notNullAttributes = notNullPreds.flatMap(_.references).distinct.map(_.exprId) - .diff(otherPreds.flatMap(_.references).distinct.map(_.exprId)) // Mark this as empty. We'll evaluate the input during doConsume(). We don't want to evaluate // all the variables at the beginning to take advantage of short circuiting. @@ -172,6 +172,13 @@ case class FilterExec(condition: Expression, child: SparkPlan) // This is very perf sensitive. // TODO: revisit this. We can consider reordering predicates as well. val generatedIsNotNullChecks = new Array[Boolean](notNullPreds.length) + + val extraIsNotNullReferences = mutable.Set[Attribute]() + + def outputContainsNotNull(ref: Attribute): Boolean = { + output.exists { attr => attr.exprId == ref.exprId } + } + val generated = otherPreds.map { c => val nullChecks = c.references.map { r => val idx = notNullPreds.indexWhere { n => n.asInstanceOf[IsNotNull].child.semanticEquals(r)} @@ -179,6 +186,9 @@ case class FilterExec(condition: Expression, child: SparkPlan) generatedIsNotNullChecks(idx) = true // Use the child's output. The nullability is what the child produced. genPredicate(notNullPreds(idx), input, child.output) + } else if (outputContainsNotNull(r) && !extraIsNotNullReferences.contains(r)) { + extraIsNotNullReferences += r + genPredicate(IsNotNull(r), input, child.output) } else { "" } From 8480de279f283d0f5a8437f91a43b4e8a23e82b9 Mon Sep 17 00:00:00 2001 From: Wang Shuo Date: Thu, 26 Sep 2019 09:57:28 +0800 Subject: [PATCH 6/8] Address some comments --- .../spark/sql/execution/basicPhysicalOperators.scala | 10 +++------- .../scala/org/apache/spark/sql/SQLQuerySuite.scala | 3 +-- 2 files changed, 4 insertions(+), 9 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala index 57161d70a8e2..ae8ac310af02 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala @@ -173,11 +173,7 @@ case class FilterExec(condition: Expression, child: SparkPlan) // TODO: revisit this. We can consider reordering predicates as well. val generatedIsNotNullChecks = new Array[Boolean](notNullPreds.length) - val extraIsNotNullReferences = mutable.Set[Attribute]() - - def outputContainsNotNull(ref: Attribute): Boolean = { - output.exists { attr => attr.exprId == ref.exprId } - } + val extraIsNotNullAttrs = mutable.Set[Attribute]() val generated = otherPreds.map { c => val nullChecks = c.references.map { r => @@ -186,8 +182,8 @@ case class FilterExec(condition: Expression, child: SparkPlan) generatedIsNotNullChecks(idx) = true // Use the child's output. The nullability is what the child produced. genPredicate(notNullPreds(idx), input, child.output) - } else if (outputContainsNotNull(r) && !extraIsNotNullReferences.contains(r)) { - extraIsNotNullReferences += r + } else if (notNullAttributes.contains(r.exprId) && !extraIsNotNullAttrs.contains(r)) { + extraIsNotNullAttrs += r genPredicate(IsNotNull(r), input, child.output) } else { "" diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala index eb78955c4da7..4eb0a5e8362b 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala @@ -3234,8 +3234,7 @@ class SQLQuerySuite extends QueryTest with SharedSparkSession { } } - test("SPARK-29213 Make it consistent when get notnull output and generate null " + - "checks in FilterExec") { + test("SPARK-29213: FilterExec should not throw NPE") { withView("t1", "t2", "t3") { sql("select ''").as[String].map(identity).toDF("x").createOrReplaceTempView("t1") sql("select * from values 0, cast(null as bigint)") From a8cb828d8c247ea2281c022083306319c7804b0d Mon Sep 17 00:00:00 2001 From: Wang Shuo Date: Thu, 26 Sep 2019 10:04:53 +0800 Subject: [PATCH 7/8] Code sytle modification --- .../org/apache/spark/sql/execution/basicPhysicalOperators.scala | 2 -- 1 file changed, 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala index ae8ac310af02..ca0cfb6834f1 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala @@ -172,9 +172,7 @@ case class FilterExec(condition: Expression, child: SparkPlan) // This is very perf sensitive. // TODO: revisit this. We can consider reordering predicates as well. val generatedIsNotNullChecks = new Array[Boolean](notNullPreds.length) - val extraIsNotNullAttrs = mutable.Set[Attribute]() - val generated = otherPreds.map { c => val nullChecks = c.references.map { r => val idx = notNullPreds.indexWhere { n => n.asInstanceOf[IsNotNull].child.semanticEquals(r)} From 124ad87721c3e5f1e08f5d2f26cc940d281d79d9 Mon Sep 17 00:00:00 2001 From: Wang Shuo Date: Thu, 26 Sep 2019 13:49:19 +0800 Subject: [PATCH 8/8] Refine test --- .../org/apache/spark/sql/SQLQuerySuite.scala | 26 +++++++++---------- 1 file changed, 13 insertions(+), 13 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala index 4eb0a5e8362b..676e10fe59dc 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala @@ -3235,27 +3235,27 @@ class SQLQuerySuite extends QueryTest with SharedSparkSession { } test("SPARK-29213: FilterExec should not throw NPE") { - withView("t1", "t2", "t3") { - sql("select ''").as[String].map(identity).toDF("x").createOrReplaceTempView("t1") - sql("select * from values 0, cast(null as bigint)") + withTempView("t1", "t2", "t3") { + sql("SELECT ''").as[String].map(identity).toDF("x").createOrReplaceTempView("t1") + sql("SELECT * FROM VALUES 0, CAST(NULL AS BIGINT)") .as[java.lang.Long] .map(identity) .toDF("x") .createOrReplaceTempView("t2") - sql("select ''").as[String].map(identity).toDF("x").createOrReplaceTempView("t3") + sql("SELECT ''").as[String].map(identity).toDF("x").createOrReplaceTempView("t3") sql( """ - |select t1.x - |from t1 - |left join ( - | select x from ( - | select x from t2 - | union all - | select substr(x,5) x from t3 + |SELECT t1.x + |FROM t1 + |LEFT JOIN ( + | SELECT x FROM ( + | SELECT x FROM t2 + | UNION ALL + | SELECT SUBSTR(x,5) x FROM t3 | ) a - | where length(x)>0 + | WHERE LENGTH(x)>0 |) t3 - |on t1.x=t3.x + |ON t1.x=t3.x """.stripMargin).collect() } }