Skip to content
Closed
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,12 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
execution.TakeOrderedAndProjectExec(
limit, order, projectList, planLater(child)) :: Nil
case logical.Limit(IntegerLiteral(limit), child) =>
execution.CollectLimitExec(limit, planLater(child)) :: Nil
// Normally wrapping child with `LocalLimitExec` here is a no-op, because
// `CollectLimitExec.executeCollect` will call `LogicalLimitExec.executeTake`, which
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

typo? LogicalLimitExec -> LocalLimitExec.

// calls `child.executeTake`. If child supports whole stage codegen, adding this
// `LocalLimitExec` can stop the processing of whole stage codegen and trigger the
// resource releasing work, after we consume `limit` rows.
execution.CollectLimitExec(limit, LocalLimitExec(limit, planLater(child))) :: Nil
case other => planLater(other) :: Nil
}
case logical.Limit(IntegerLiteral(limit), logical.Sort(order, true, child)) =>
Expand Down
10 changes: 10 additions & 0 deletions sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,16 @@ trait BaseLimitExec extends UnaryExecNode with CodegenSupport {
val limit: Int
override def output: Seq[Attribute] = child.output

// Do not enable whole stage codegen for a single limit.
override def supportCodegen: Boolean = child match {
case plan: CodegenSupport => plan.supportCodegen
case _ => false
}

override def executeTake(n: Int): Array[InternalRow] = child.executeTake(math.min(n, limit))

override def executeCollect(): Array[InternalRow] = child.executeTake(limit)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

executeTake looks good. But should executeCollect be the same for LocalLimitExec and GlobalLimitExec?

doExecute is an example. For LocalLimitExec, it takes limit rows in each partition. For GlobalLimitExec, it takes limit rows in single partition.

Previously executeCollect retrieves limit rows from each partition. After this change, executeCollect for LocalLimitExec retrieves only limit rows.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems this fix relies CollectLimitExec.executeCollect to call LocalLimitExec.executeTake. Looks like we don't need to change executeCollect?


protected override def doExecute(): RDD[InternalRow] = child.execute().mapPartitions { iter =>
iter.take(limit)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2658,4 +2658,9 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext {
checkAnswer(sql("SELECT __auto_generated_subquery_name.i from (SELECT i FROM v)"), Row(1))
}
}

test("SPARK-21743: top-most limit should not cause memory leak") {
// In unit test, Spark will fail the query if memory leak detected.
Copy link
Member

@gatorsmile gatorsmile Aug 16, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Without the fix, the test did not fail, but I saw the warning message:

22:05:07.455 WARN org.apache.spark.executor.Executor: Managed memory leak detected; size = 33554432 bytes, TID = 2

With the fix, the warning message is gone.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

did you try this test in spark shell? We only throw exception for memory leak if spark.unsafe.exceptionOnMemoryLeak is true. But this config is false by default, and is true in unit test.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When this is also executed on Intellij, this test does not fail. How about this?

class SQLQuerySparkContextSuite extends QueryTest with LocalSparkContext {
  val spark = SparkSession
    .builder()
    .config("spark.unsafe.exceptionOnMemoryLeak", "true")
    .master("local[1]")
    .getOrCreate()
  test("SPARK-21743: top-most limit should not cause memory leak") {
    spark.range(100).groupBy("id").count().limit(1).collect()
  }
}

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That should be fine, as long as our test framework can capture it. : )

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This issue is fixed in #18967

I think we need to move this test case to DataFrameSuite

Copy link
Member

@sameeragarwal sameeragarwal Aug 16, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about just adding it in TestSparkSession instead? Ah, seems like Xiao already did that.

spark.range(100).groupBy("id").count().limit(1).collect()
}
}