From c5946e8a6e5b3cbdeff87eb4ca18acdf09d6896d Mon Sep 17 00:00:00 2001 From: Ziqi Liu Date: Thu, 29 Aug 2024 20:26:05 +0800 Subject: [PATCH 1/4] fix NPE in EmptyRelationExec --- .../sql/execution/EmptyRelationExec.scala | 11 +++-- .../adaptive/AdaptiveQueryExecSuite.scala | 41 +++++++++++++++++++ 2 files changed, 48 insertions(+), 4 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/EmptyRelationExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/EmptyRelationExec.scala index 085c0b22524c..9039735bec22 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/EmptyRelationExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/EmptyRelationExec.scala @@ -83,10 +83,13 @@ case class EmptyRelationExec(@transient logical: LogicalPlan) extends LeafExecNo } override protected[sql] def cleanupResources(): Unit = { - logical.foreach { - case LogicalQueryStage(_, physical) => - physical.cleanupResources() - case _ => + // This code path might be executed in executor where `logical` could be null. + if (logical != null) { + logical.foreach { + case LogicalQueryStage(_, physical) => + physical.cleanupResources() + case _ => + } } super.cleanupResources() } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala index fc54e7ecd46d..3d3d6a181f33 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala @@ -1608,6 +1608,47 @@ class AdaptiveQueryExecSuite } } + test("SPARK-49460: NPE error in EmptyRelationExec.cleanupResources") { + try { + spark.sql("create table t1left (a int, b int);") + spark.sql("insert into t1left values (1, 1), (2,2), (3,3);") + spark.sql("create table t1right (a int, b int);") + spark.sql("create table t1empty (a int, b int);") + spark.sql("insert into t1right values (2,20), (4, 40);") + + spark.sql(""" + |with leftT as ( + | with erp as ( + | select + | * + | from + | t1left + | join t1empty on t1left.a = t1empty.a + | join t1right on t1left.a = t1right.a + | ) + | SELECT + | CASE + | WHEN COUNT(*) = 0 THEN 4 + | ELSE NULL + | END AS a + | FROM + | erp + | HAVING + | COUNT(*) = 0 + |) + |select + | /*+ MERGEJOIN(t1right) */ + | * + |from + | leftT + | join t1right on leftT.a = t1right.a""".stripMargin).collect() + } finally { + Seq("t1left", "t1right", "t1empty").foreach { table => + spark.sql(s"drop table if exists ${table}") + } + } + } + test("SPARK-35585: Support propagate empty relation through project/filter") { withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true", SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") { From 0c06fe0230162e866f768fc821be0e9d3e294080 Mon Sep 17 00:00:00 2001 From: Ziqi Liu Date: Thu, 29 Aug 2024 23:32:51 +0800 Subject: [PATCH 2/4] address comments --- .../apache/spark/sql/execution/EmptyRelationExec.scala | 4 ++-- .../sql/execution/adaptive/AdaptiveQueryExecSuite.scala | 8 ++------ 2 files changed, 4 insertions(+), 8 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/EmptyRelationExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/EmptyRelationExec.scala index 9039735bec22..32b3ad101a04 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/EmptyRelationExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/EmptyRelationExec.scala @@ -84,8 +84,8 @@ case class EmptyRelationExec(@transient logical: LogicalPlan) extends LeafExecNo override protected[sql] def cleanupResources(): Unit = { // This code path might be executed in executor where `logical` could be null. - if (logical != null) { - logical.foreach { + Option(logical).foreach { l => + l.foreach { case LogicalQueryStage(_, physical) => physical.cleanupResources() case _ => diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala index 3d3d6a181f33..938a96a86b01 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala @@ -1608,8 +1608,8 @@ class AdaptiveQueryExecSuite } } - test("SPARK-49460: NPE error in EmptyRelationExec.cleanupResources") { - try { + test("SPARK-49460: NPE in EmptyRelationExec.cleanupResources") { + withTable("t1left", "t1right", "t1empty") { spark.sql("create table t1left (a int, b int);") spark.sql("insert into t1left values (1, 1), (2,2), (3,3);") spark.sql("create table t1right (a int, b int);") @@ -1642,10 +1642,6 @@ class AdaptiveQueryExecSuite |from | leftT | join t1right on leftT.a = t1right.a""".stripMargin).collect() - } finally { - Seq("t1left", "t1right", "t1empty").foreach { table => - spark.sql(s"drop table if exists ${table}") - } } } From 23af0da557b53f9e282dd3c52344777c931504cd Mon Sep 17 00:00:00 2001 From: Ziqi Liu Date: Fri, 30 Aug 2024 11:20:02 +0800 Subject: [PATCH 3/4] remove cleanupResources impl from EmptyRelationExec --- .../spark/sql/execution/EmptyRelationExec.scala | 12 ------------ 1 file changed, 12 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/EmptyRelationExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/EmptyRelationExec.scala index 32b3ad101a04..46cc3dd94e52 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/EmptyRelationExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/EmptyRelationExec.scala @@ -81,16 +81,4 @@ case class EmptyRelationExec(@transient logical: LogicalPlan) extends LeafExecNo override def doCanonicalize(): SparkPlan = { this.copy(logical = LocalRelation(logical.output).canonicalized) } - - override protected[sql] def cleanupResources(): Unit = { - // This code path might be executed in executor where `logical` could be null. - Option(logical).foreach { l => - l.foreach { - case LogicalQueryStage(_, physical) => - physical.cleanupResources() - case _ => - } - } - super.cleanupResources() - } } From b9ba70c9b89155bea76fdf4a1c469862145c93ab Mon Sep 17 00:00:00 2001 From: Ziqi Liu Date: Fri, 30 Aug 2024 21:33:29 +0800 Subject: [PATCH 4/4] fix --- .../scala/org/apache/spark/sql/execution/EmptyRelationExec.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/EmptyRelationExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/EmptyRelationExec.scala index 46cc3dd94e52..8a544de7567e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/EmptyRelationExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/EmptyRelationExec.scala @@ -22,7 +22,6 @@ import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.Attribute import org.apache.spark.sql.catalyst.plans.logical.LocalRelation import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan -import org.apache.spark.sql.execution.adaptive.LogicalQueryStage import org.apache.spark.sql.vectorized.ColumnarBatch /**