From cca1dda60b73c8f933861e5841305320f6a785ba Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Wed, 16 Aug 2017 12:27:03 +0800 Subject: [PATCH 1/3] top-most limit should not cause memory leak --- .../org/apache/spark/sql/execution/SparkStrategies.scala | 7 ++++++- .../apache/spark/sql/execution/WholeStageCodegenExec.scala | 4 ++++ .../main/scala/org/apache/spark/sql/execution/limit.scala | 4 ++++ .../test/scala/org/apache/spark/sql/SQLQuerySuite.scala | 5 +++++ 4 files changed, 19 insertions(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala index 691f71a7d4ac..05a6e3977421 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala @@ -72,7 +72,12 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] { execution.TakeOrderedAndProjectExec( limit, order, projectList, planLater(child)) :: Nil case logical.Limit(IntegerLiteral(limit), child) => - execution.CollectLimitExec(limit, planLater(child)) :: Nil + // Normally wrapping child with `LocalLimitExec` here is a no-op, because + // `CollectLimitExec.executeCollect` will call `LogicalLimitExec.executeTake`, which + // calls `child.executeTake`. If child supports whole stage codegen, adding this + // `LocalLimitExec` can break the input consuming loop inside whole stage codegen and + // trigger the resource releasing work, after we consume `limit` rows. + execution.CollectLimitExec(limit, LocalLimitExec(limit, planLater(child))) :: Nil case other => planLater(other) :: Nil } case logical.Limit(IntegerLiteral(limit), logical.Sort(order, true, child)) => diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala index 34134db278ad..4fc3748d6021 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala @@ -474,6 +474,10 @@ case class CollapseCodegenStages(conf: SQLConf) extends Rule[SparkPlan] { } private def supportCodegen(plan: SparkPlan): Boolean = plan match { + // Do not enable whole stage codegen for a single limit. + case limit: BaseLimitExec if !limit.child.isInstanceOf[CodegenSupport] || + !limit.child.asInstanceOf[CodegenSupport].supportCodegen => + false case plan: CodegenSupport if plan.supportCodegen => val willFallback = plan.expressions.exists(_.find(e => !supportCodegen(e)).isDefined) // the generated code will be huge if there are too many columns diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala index 73a0f8735ed4..679f02bd0c93 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala @@ -54,6 +54,10 @@ trait BaseLimitExec extends UnaryExecNode with CodegenSupport { val limit: Int override def output: Seq[Attribute] = child.output + override def executeTake(n: Int): Array[InternalRow] = child.executeTake(math.min(n, limit)) + + override def executeCollect(): Array[InternalRow] = child.executeTake(limit) + protected override def doExecute(): RDD[InternalRow] = child.execute().mapPartitions { iter => iter.take(limit) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala index e95f6dba4607..923c6d8eb71f 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala @@ -2658,4 +2658,9 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext { checkAnswer(sql("SELECT __auto_generated_subquery_name.i from (SELECT i FROM v)"), Row(1)) } } + + test("SPARK-21743: top-most limit should not cause memory leak") { + // In unit test, Spark will fail the query if memory leak detected. + spark.range(100).groupBy("id").count().limit(1).collect() + } } From 4a88206f13ce43a92e4a16b5fcb4a64c2c0754aa Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Wed, 16 Aug 2017 21:56:58 +0800 Subject: [PATCH 2/3] address comments --- .../org/apache/spark/sql/execution/SparkStrategies.scala | 4 ++-- .../apache/spark/sql/execution/WholeStageCodegenExec.scala | 4 ---- .../main/scala/org/apache/spark/sql/execution/limit.scala | 6 ++++++ 3 files changed, 8 insertions(+), 6 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala index 05a6e3977421..996eec2dc7f7 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala @@ -75,8 +75,8 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] { // Normally wrapping child with `LocalLimitExec` here is a no-op, because // `CollectLimitExec.executeCollect` will call `LogicalLimitExec.executeTake`, which // calls `child.executeTake`. If child supports whole stage codegen, adding this - // `LocalLimitExec` can break the input consuming loop inside whole stage codegen and - // trigger the resource releasing work, after we consume `limit` rows. + // `LocalLimitExec` can stop the processing of whole stage codegen and trigger the + // resource releasing work, after we consume `limit` rows. execution.CollectLimitExec(limit, LocalLimitExec(limit, planLater(child))) :: Nil case other => planLater(other) :: Nil } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala index 4fc3748d6021..34134db278ad 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala @@ -474,10 +474,6 @@ case class CollapseCodegenStages(conf: SQLConf) extends Rule[SparkPlan] { } private def supportCodegen(plan: SparkPlan): Boolean = plan match { - // Do not enable whole stage codegen for a single limit. - case limit: BaseLimitExec if !limit.child.isInstanceOf[CodegenSupport] || - !limit.child.asInstanceOf[CodegenSupport].supportCodegen => - false case plan: CodegenSupport if plan.supportCodegen => val willFallback = plan.expressions.exists(_.find(e => !supportCodegen(e)).isDefined) // the generated code will be huge if there are too many columns diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala index 679f02bd0c93..9a4511d78adc 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala @@ -54,6 +54,12 @@ trait BaseLimitExec extends UnaryExecNode with CodegenSupport { val limit: Int override def output: Seq[Attribute] = child.output + // Do not enable whole stage codegen for a single limit. + override def supportCodegen: Boolean = child match { + case plan: CodegenSupport => plan.supportCodegen + case _ => false + } + override def executeTake(n: Int): Array[InternalRow] = child.executeTake(math.min(n, limit)) override def executeCollect(): Array[InternalRow] = child.executeTake(limit) From 44627788b9af15e84ec951543a56c7c9970ef247 Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Thu, 17 Aug 2017 09:01:39 +0800 Subject: [PATCH 3/3] more comments --- .../scala/org/apache/spark/sql/execution/SparkStrategies.scala | 2 +- .../src/main/scala/org/apache/spark/sql/execution/limit.scala | 2 -- 2 files changed, 1 insertion(+), 3 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala index 996eec2dc7f7..2e8ce4541865 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala @@ -73,7 +73,7 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] { limit, order, projectList, planLater(child)) :: Nil case logical.Limit(IntegerLiteral(limit), child) => // Normally wrapping child with `LocalLimitExec` here is a no-op, because - // `CollectLimitExec.executeCollect` will call `LogicalLimitExec.executeTake`, which + // `CollectLimitExec.executeCollect` will call `LocalLimitExec.executeTake`, which // calls `child.executeTake`. If child supports whole stage codegen, adding this // `LocalLimitExec` can stop the processing of whole stage codegen and trigger the // resource releasing work, after we consume `limit` rows. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala index 9a4511d78adc..7cef5569717a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala @@ -62,8 +62,6 @@ trait BaseLimitExec extends UnaryExecNode with CodegenSupport { override def executeTake(n: Int): Array[InternalRow] = child.executeTake(math.min(n, limit)) - override def executeCollect(): Array[InternalRow] = child.executeTake(limit) - protected override def doExecute(): RDD[InternalRow] = child.execute().mapPartitions { iter => iter.take(limit) }