diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala index 392ca13724bc..fb46970e38f3 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala @@ -122,11 +122,11 @@ case class GlobalLimitExec(limit: Int, child: SparkPlan) extends UnaryExecNode { Nil } - // During global limit, try to evenly distribute limited rows across data - // partitions. If disabled, scanning data partitions sequentially until reaching limit number. - // Besides, if child output has certain ordering, we can't evenly pick up rows from - // each parititon. - val flatGlobalLimit = sqlContext.conf.limitFlatGlobalLimit && child.outputOrdering == Nil + // This is an optimization to evenly distribute limited rows across all partitions. + // When enabled, Spark goes to take rows at each partition repeatedly until reaching + // limit number. When disabled, Spark takes all rows at first partition, then rows + // at second partition ..., until reaching limit number. + val flatGlobalLimit = sqlContext.conf.limitFlatGlobalLimit val shuffled = new ShuffledRowRDD(shuffleDependency) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/TakeOrderedAndProjectSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/TakeOrderedAndProjectSuite.scala index 7e317a4d8026..0a1c94cc4ccf 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/TakeOrderedAndProjectSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/TakeOrderedAndProjectSuite.scala @@ -22,6 +22,7 @@ import scala.util.Random import org.apache.spark.sql.{DataFrame, Row} import org.apache.spark.sql.catalyst.dsl.expressions._ import org.apache.spark.sql.catalyst.expressions.Literal +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SharedSQLContext import org.apache.spark.sql.types._ @@ -31,10 +32,19 @@ class TakeOrderedAndProjectSuite extends SparkPlanTest with SharedSQLContext { private var rand: Random = _ private var seed: Long = 0 + private val originalLimitFlatGlobalLimit = SQLConf.get.getConf(SQLConf.LIMIT_FLAT_GLOBAL_LIMIT) + protected override def beforeAll(): Unit = { super.beforeAll() seed = System.currentTimeMillis() rand = new Random(seed) + + // Disable the optimization to make Sort-Limit match `TakeOrderedAndProject` semantics. + SQLConf.get.setConf(SQLConf.LIMIT_FLAT_GLOBAL_LIMIT, false) + } + + protected override def afterAll() = { + SQLConf.get.setConf(SQLConf.LIMIT_FLAT_GLOBAL_LIMIT, originalLimitFlatGlobalLimit) } private def generateRandomInputData(): DataFrame = {