Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -122,11 +122,11 @@ case class GlobalLimitExec(limit: Int, child: SparkPlan) extends UnaryExecNode {
Nil
}

// During global limit, try to evenly distribute limited rows across data
// partitions. If disabled, scanning data partitions sequentially until reaching limit number.
// Besides, if child output has certain ordering, we can't evenly pick up rows from
// each parititon.
val flatGlobalLimit = sqlContext.conf.limitFlatGlobalLimit && child.outputOrdering == Nil
// This is an optimization to evenly distribute limited rows across all partitions.
// When enabled, Spark goes to take rows at each partition repeatedly until reaching
// limit number. When disabled, Spark takes all rows at first partition, then rows
// at second partition ..., until reaching limit number.
val flatGlobalLimit = sqlContext.conf.limitFlatGlobalLimit

val shuffled = new ShuffledRowRDD(shuffleDependency)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import scala.util.Random
import org.apache.spark.sql.{DataFrame, Row}
import org.apache.spark.sql.catalyst.dsl.expressions._
import org.apache.spark.sql.catalyst.expressions.Literal
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.SharedSQLContext
import org.apache.spark.sql.types._

Expand All @@ -31,10 +32,19 @@ class TakeOrderedAndProjectSuite extends SparkPlanTest with SharedSQLContext {
private var rand: Random = _
private var seed: Long = 0

private val originalLimitFlatGlobalLimit = SQLConf.get.getConf(SQLConf.LIMIT_FLAT_GLOBAL_LIMIT)

protected override def beforeAll(): Unit = {
super.beforeAll()
seed = System.currentTimeMillis()
rand = new Random(seed)

// Disable the optimization to make Sort-Limit match `TakeOrderedAndProject` semantics.
SQLConf.get.setConf(SQLConf.LIMIT_FLAT_GLOBAL_LIMIT, false)
}

protected override def afterAll() = {
SQLConf.get.setConf(SQLConf.LIMIT_FLAT_GLOBAL_LIMIT, originalLimitFlatGlobalLimit)
}

private def generateRandomInputData(): DataFrame = {
Expand Down