Skip to content

Commit 4c5158e

Browse files
rxingatorsmile
authored andcommitted
[SPARK-21644][SQL] LocalLimit.maxRows is defined incorrectly
## What changes were proposed in this pull request? The definition of `maxRows` in `LocalLimit` operator was simply wrong. This patch introduces a new `maxRowsPerPartition` method and uses that in pruning. The patch also adds more documentation on why we need local limit vs global limit. Note that this previously has never been a bug because the way the code is structured, but future use of the maxRows could lead to bugs. ## How was this patch tested? Should be covered by existing test cases. Closes #18851 Author: gatorsmile <gatorsmile@gmail.com> Author: Reynold Xin <rxin@databricks.com> Closes #19393 from gatorsmile/pr-18851.
1 parent fa225da commit 4c5158e

File tree

4 files changed

+74
-12
lines changed

4 files changed

+74
-12
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala

Lines changed: 18 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -305,13 +305,20 @@ object LimitPushDown extends Rule[LogicalPlan] {
305305
}
306306
}
307307

308-
private def maybePushLimit(limitExp: Expression, plan: LogicalPlan): LogicalPlan = {
309-
(limitExp, plan.maxRows) match {
310-
case (IntegerLiteral(maxRow), Some(childMaxRows)) if maxRow < childMaxRows =>
308+
private def maybePushLocalLimit(limitExp: Expression, plan: LogicalPlan): LogicalPlan = {
309+
(limitExp, plan.maxRowsPerPartition) match {
310+
case (IntegerLiteral(newLimit), Some(childMaxRows)) if newLimit < childMaxRows =>
311+
// If the child has a cap on max rows per partition and the cap is larger than
312+
// the new limit, put a new LocalLimit there.
311313
LocalLimit(limitExp, stripGlobalLimitIfPresent(plan))
314+
312315
case (_, None) =>
316+
// If the child has no cap, put the new LocalLimit.
313317
LocalLimit(limitExp, stripGlobalLimitIfPresent(plan))
314-
case _ => plan
318+
319+
case _ =>
320+
// Otherwise, don't put a new LocalLimit.
321+
plan
315322
}
316323
}
317324

@@ -323,7 +330,7 @@ object LimitPushDown extends Rule[LogicalPlan] {
323330
// pushdown Limit through it. Once we add UNION DISTINCT, however, we will not be able to
324331
// pushdown Limit.
325332
case LocalLimit(exp, Union(children)) =>
326-
LocalLimit(exp, Union(children.map(maybePushLimit(exp, _))))
333+
LocalLimit(exp, Union(children.map(maybePushLocalLimit(exp, _))))
327334
// Add extra limits below OUTER JOIN. For LEFT OUTER and FULL OUTER JOIN we push limits to the
328335
// left and right sides, respectively. For FULL OUTER JOIN, we can only push limits to one side
329336
// because we need to ensure that rows from the limited side still have an opportunity to match
@@ -335,19 +342,19 @@ object LimitPushDown extends Rule[LogicalPlan] {
335342
// - If neither side is limited, limit the side that is estimated to be bigger.
336343
case LocalLimit(exp, join @ Join(left, right, joinType, _)) =>
337344
val newJoin = joinType match {
338-
case RightOuter => join.copy(right = maybePushLimit(exp, right))
339-
case LeftOuter => join.copy(left = maybePushLimit(exp, left))
345+
case RightOuter => join.copy(right = maybePushLocalLimit(exp, right))
346+
case LeftOuter => join.copy(left = maybePushLocalLimit(exp, left))
340347
case FullOuter =>
341348
(left.maxRows, right.maxRows) match {
342349
case (None, None) =>
343350
if (left.stats.sizeInBytes >= right.stats.sizeInBytes) {
344-
join.copy(left = maybePushLimit(exp, left))
351+
join.copy(left = maybePushLocalLimit(exp, left))
345352
} else {
346-
join.copy(right = maybePushLimit(exp, right))
353+
join.copy(right = maybePushLocalLimit(exp, right))
347354
}
348355
case (Some(_), Some(_)) => join
349-
case (Some(_), None) => join.copy(left = maybePushLimit(exp, left))
350-
case (None, Some(_)) => join.copy(right = maybePushLimit(exp, right))
356+
case (Some(_), None) => join.copy(left = maybePushLocalLimit(exp, left))
357+
case (None, Some(_)) => join.copy(right = maybePushLocalLimit(exp, right))
351358

352359
}
353360
case _ => join

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,11 @@ abstract class LogicalPlan
9797
*/
9898
def maxRows: Option[Long] = None
9999

100+
/**
101+
* Returns the maximum number of rows this plan may compute on each partition.
102+
*/
103+
def maxRowsPerPartition: Option[Long] = maxRows
104+
100105
/**
101106
* Returns true if this expression and all its children have been resolved to a specific schema
102107
* and false if it still contains any unresolved placeholders. Implementations of LogicalPlan

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala

Lines changed: 48 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -191,6 +191,9 @@ object Union {
191191
}
192192
}
193193

194+
/**
195+
* Logical plan for unioning two plans, without a distinct. This is UNION ALL in SQL.
196+
*/
194197
case class Union(children: Seq[LogicalPlan]) extends LogicalPlan {
195198
override def maxRows: Option[Long] = {
196199
if (children.exists(_.maxRows.isEmpty)) {
@@ -200,6 +203,17 @@ case class Union(children: Seq[LogicalPlan]) extends LogicalPlan {
200203
}
201204
}
202205

206+
/**
207+
* Note the definition has assumption about how union is implemented physically.
208+
*/
209+
override def maxRowsPerPartition: Option[Long] = {
210+
if (children.exists(_.maxRowsPerPartition.isEmpty)) {
211+
None
212+
} else {
213+
Some(children.flatMap(_.maxRowsPerPartition).sum)
214+
}
215+
}
216+
203217
// updating nullability to make all the children consistent
204218
override def output: Seq[Attribute] =
205219
children.map(_.output).transpose.map(attrs =>
@@ -669,6 +683,27 @@ case class Pivot(
669683
}
670684
}
671685

686+
/**
687+
* A constructor for creating a logical limit, which is split into two separate logical nodes:
688+
* a [[LocalLimit]], which is a partition local limit, followed by a [[GlobalLimit]].
689+
*
690+
* This muds the water for clean logical/physical separation, and is done for better limit pushdown.
691+
* In distributed query processing, a non-terminal global limit is actually an expensive operation
692+
* because it requires coordination (in Spark this is done using a shuffle).
693+
*
694+
* In most cases when we want to push down limit, it is often better to only push some partition
695+
* local limit. Consider the following:
696+
*
697+
* GlobalLimit(Union(A, B))
698+
*
699+
* It is better to do
700+
* GlobalLimit(Union(LocalLimit(A), LocalLimit(B)))
701+
*
702+
* than
703+
* Union(GlobalLimit(A), GlobalLimit(B)).
704+
*
705+
* So we introduced LocalLimit and GlobalLimit in the logical plan node for limit pushdown.
706+
*/
672707
object Limit {
673708
def apply(limitExpr: Expression, child: LogicalPlan): UnaryNode = {
674709
GlobalLimit(limitExpr, LocalLimit(limitExpr, child))
@@ -682,6 +717,11 @@ object Limit {
682717
}
683718
}
684719

720+
/**
721+
* A global (coordinated) limit. This operator can emit at most `limitExpr` number in total.
722+
*
723+
* See [[Limit]] for more information.
724+
*/
685725
case class GlobalLimit(limitExpr: Expression, child: LogicalPlan) extends UnaryNode {
686726
override def output: Seq[Attribute] = child.output
687727
override def maxRows: Option[Long] = {
@@ -692,9 +732,16 @@ case class GlobalLimit(limitExpr: Expression, child: LogicalPlan) extends UnaryN
692732
}
693733
}
694734

735+
/**
736+
* A partition-local (non-coordinated) limit. This operator can emit at most `limitExpr` number
737+
* of tuples on each physical partition.
738+
*
739+
* See [[Limit]] for more information.
740+
*/
695741
case class LocalLimit(limitExpr: Expression, child: LogicalPlan) extends UnaryNode {
696742
override def output: Seq[Attribute] = child.output
697-
override def maxRows: Option[Long] = {
743+
744+
override def maxRowsPerPartition: Option[Long] = {
698745
limitExpr match {
699746
case IntegerLiteral(limit) => Some(limit)
700747
case _ => None

sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -554,6 +554,9 @@ case class RangeExec(range: org.apache.spark.sql.catalyst.plans.logical.Range)
554554

555555
/**
556556
* Physical plan for unioning two plans, without a distinct. This is UNION ALL in SQL.
557+
*
558+
* If we change how this is implemented physically, we'd need to update
559+
* [[org.apache.spark.sql.catalyst.plans.logical.Union.maxRowsPerPartition]].
557560
*/
558561
case class UnionExec(children: Seq[SparkPlan]) extends SparkPlan {
559562
override def output: Seq[Attribute] =

0 commit comments

Comments
 (0)