Skip to content

Commit 5c27b0d

Browse files
viiryacloud-fan
authored andcommitted
[SPARK-19355][SQL][FOLLOWUP] Remove the child.outputOrdering check in global limit
## What changes were proposed in this pull request? This is based on the discussion https://github.com/apache/spark/pull/16677/files#r212805327. As SQL standard doesn't mandate that a nested order by followed by a limit has to respect that ordering clause, this patch removes the `child.outputOrdering` check. ## How was this patch tested? Unit tests. Closes apache#22239 from viirya/improve-global-limit-parallelism-followup. Authored-by: Liang-Chi Hsieh <[email protected]> Signed-off-by: Wenchen Fan <[email protected]>
1 parent 5cdb8a2 commit 5c27b0d

File tree

2 files changed

+15
-5
lines changed

2 files changed

+15
-5
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -122,11 +122,11 @@ case class GlobalLimitExec(limit: Int, child: SparkPlan) extends UnaryExecNode {
122122
Nil
123123
}
124124

125-
// During global limit, try to evenly distribute limited rows across data
126-
// partitions. If disabled, scanning data partitions sequentially until reaching limit number.
127-
// Besides, if child output has certain ordering, we can't evenly pick up rows from
128-
// each parititon.
129-
val flatGlobalLimit = sqlContext.conf.limitFlatGlobalLimit && child.outputOrdering == Nil
125+
// This is an optimization to evenly distribute limited rows across all partitions.
126+
// When enabled, Spark goes to take rows at each partition repeatedly until reaching
127+
// limit number. When disabled, Spark takes all rows at first partition, then rows
128+
// at second partition ..., until reaching limit number.
129+
val flatGlobalLimit = sqlContext.conf.limitFlatGlobalLimit
130130

131131
val shuffled = new ShuffledRowRDD(shuffleDependency)
132132

sql/core/src/test/scala/org/apache/spark/sql/execution/TakeOrderedAndProjectSuite.scala

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import scala.util.Random
2222
import org.apache.spark.sql.{DataFrame, Row}
2323
import org.apache.spark.sql.catalyst.dsl.expressions._
2424
import org.apache.spark.sql.catalyst.expressions.Literal
25+
import org.apache.spark.sql.internal.SQLConf
2526
import org.apache.spark.sql.test.SharedSQLContext
2627
import org.apache.spark.sql.types._
2728

@@ -31,10 +32,19 @@ class TakeOrderedAndProjectSuite extends SparkPlanTest with SharedSQLContext {
3132
private var rand: Random = _
3233
private var seed: Long = 0
3334

35+
private val originalLimitFlatGlobalLimit = SQLConf.get.getConf(SQLConf.LIMIT_FLAT_GLOBAL_LIMIT)
36+
3437
protected override def beforeAll(): Unit = {
3538
super.beforeAll()
3639
seed = System.currentTimeMillis()
3740
rand = new Random(seed)
41+
42+
// Disable the optimization to make Sort-Limit match `TakeOrderedAndProject` semantics.
43+
SQLConf.get.setConf(SQLConf.LIMIT_FLAT_GLOBAL_LIMIT, false)
44+
}
45+
46+
protected override def afterAll() = {
47+
SQLConf.get.setConf(SQLConf.LIMIT_FLAT_GLOBAL_LIMIT, originalLimitFlatGlobalLimit)
3848
}
3949

4050
private def generateRandomInputData(): DataFrame = {

0 commit comments

Comments
 (0)