Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Revert "[SPARK-19355][SQL][FOLLOWUP] Remove the child.outputOrdering …
…check in global limit"

This reverts commit 5c27b0d.
  • Loading branch information
viirya committed Sep 20, 2018
commit 2dae33e5b897c0ec05f675ec565abee5f2c4ea34
Original file line number Diff line number Diff line change
Expand Up @@ -122,11 +122,11 @@ case class GlobalLimitExec(limit: Int, child: SparkPlan) extends UnaryExecNode {
Nil
}

// This is an optimization to evenly distribute limited rows across all partitions.
// When enabled, Spark goes to take rows at each partition repeatedly until reaching
// limit number. When disabled, Spark takes all rows at first partition, then rows
// at second partition ..., until reaching limit number.
val flatGlobalLimit = sqlContext.conf.limitFlatGlobalLimit
// During global limit, try to evenly distribute limited rows across data
// partitions. If disabled, scanning data partitions sequentially until reaching limit number.
// Besides, if child output has certain ordering, we can't evenly pick up rows from
// each parititon.
val flatGlobalLimit = sqlContext.conf.limitFlatGlobalLimit && child.outputOrdering == Nil

val shuffled = new ShuffledRowRDD(shuffleDependency)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import scala.util.Random
import org.apache.spark.sql.{DataFrame, Row}
import org.apache.spark.sql.catalyst.dsl.expressions._
import org.apache.spark.sql.catalyst.expressions.Literal
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.SharedSQLContext
import org.apache.spark.sql.types._

Expand All @@ -32,19 +31,10 @@ class TakeOrderedAndProjectSuite extends SparkPlanTest with SharedSQLContext {
private var rand: Random = _
private var seed: Long = 0

private val originalLimitFlatGlobalLimit = SQLConf.get.getConf(SQLConf.LIMIT_FLAT_GLOBAL_LIMIT)

protected override def beforeAll(): Unit = {
super.beforeAll()
seed = System.currentTimeMillis()
rand = new Random(seed)

// Disable the optimization to make Sort-Limit match `TakeOrderedAndProject` semantics.
SQLConf.get.setConf(SQLConf.LIMIT_FLAT_GLOBAL_LIMIT, false)
}

protected override def afterAll() = {
SQLConf.get.setConf(SQLConf.LIMIT_FLAT_GLOBAL_LIMIT, originalLimitFlatGlobalLimit)
}

private def generateRandomInputData(): DataFrame = {
Expand Down