Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,12 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
execution.TakeOrderedAndProjectExec(
limit, order, projectList, planLater(child)) :: Nil
case logical.Limit(IntegerLiteral(limit), child) =>
execution.CollectLimitExec(limit, planLater(child)) :: Nil
// Normally wrapping child with `LocalLimitExec` here is a no-op, because
// `CollectLimitExec.executeCollect` will call `LogicalLimitExec.executeTake`, which
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

typo? LogicalLimitExec -> LocalLimitExec.

// calls `child.executeTake`. If child supports whole stage codegen, adding this
// `LocalLimitExec` can break the input consuming loop inside whole stage codegen and
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

By ...break the input consuming loop... you mean stop processing, and break whole stage code gen. We may need to word this slightly differently :)...

// trigger the resource releasing work, after we consume `limit` rows.
execution.CollectLimitExec(limit, LocalLimitExec(limit, planLater(child))) :: Nil
case other => planLater(other) :: Nil
}
case logical.Limit(IntegerLiteral(limit), logical.Sort(order, true, child)) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -474,6 +474,10 @@ case class CollapseCodegenStages(conf: SQLConf) extends Rule[SparkPlan] {
}

private def supportCodegen(plan: SparkPlan): Boolean = plan match {
// Do not enable whole stage codegen for a single limit.
case limit: BaseLimitExec if !limit.child.isInstanceOf[CodegenSupport] ||
!limit.child.asInstanceOf[CodegenSupport].supportCodegen =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can also override LocalLimitExec.supportCodegen and make that depend on the child.supportCodegen. That seems more elegant that to special case it here.

false
case plan: CodegenSupport if plan.supportCodegen =>
val willFallback = plan.expressions.exists(_.find(e => !supportCodegen(e)).isDefined)
// the generated code will be huge if there are too many columns
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,10 @@ trait BaseLimitExec extends UnaryExecNode with CodegenSupport {
val limit: Int
override def output: Seq[Attribute] = child.output

override def executeTake(n: Int): Array[InternalRow] = child.executeTake(math.min(n, limit))

override def executeCollect(): Array[InternalRow] = child.executeTake(limit)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

executeTake looks good. But should executeCollect be the same for LocalLimitExec and GlobalLimitExec?

doExecute is an example. For LocalLimitExec, it takes limit rows in each partition. For GlobalLimitExec, it takes limit rows in single partition.

Previously executeCollect retrieves limit rows from each partition. After this change, executeCollect for LocalLimitExec retrieves only limit rows.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems this fix relies CollectLimitExec.executeCollect to call LocalLimitExec.executeTake. Looks like we don't need to change executeCollect?


protected override def doExecute(): RDD[InternalRow] = child.execute().mapPartitions { iter =>
iter.take(limit)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2658,4 +2658,9 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext {
checkAnswer(sql("SELECT __auto_generated_subquery_name.i from (SELECT i FROM v)"), Row(1))
}
}

test("SPARK-21743: top-most limit should not cause memory leak") {
// In unit test, Spark will fail the query if memory leak detected.
Copy link
Member

@gatorsmile gatorsmile Aug 16, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Without the fix, the test did not fail, but I saw the warning message:

22:05:07.455 WARN org.apache.spark.executor.Executor: Managed memory leak detected; size = 33554432 bytes, TID = 2

With the fix, the warning message is gone.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

did you try this test in spark shell? We only throw exception for memory leak if spark.unsafe.exceptionOnMemoryLeak is true. But this config is false by default, and is true in unit test.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When this is also executed on Intellij, this test does not fail. How about this?

class SQLQuerySparkContextSuite extends QueryTest with LocalSparkContext {
  val spark = SparkSession
    .builder()
    .config("spark.unsafe.exceptionOnMemoryLeak", "true")
    .master("local[1]")
    .getOrCreate()
  test("SPARK-21743: top-most limit should not cause memory leak") {
    spark.range(100).groupBy("id").count().limit(1).collect()
  }
}

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That should be fine, as long as our test framework can capture it. : )

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This issue is fixed in #18967

I think we need to move this test case to DataFrameSuite

Copy link
Member

@sameeragarwal sameeragarwal Aug 16, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about just adding it in TestSparkSession instead? Ah, seems like Xiao already did that.

spark.range(100).groupBy("id").count().limit(1).collect()
}
}