Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Moved sort updates from basicOperator.scala to sort.scala
  • Loading branch information
Ilya Ganelin committed Jul 31, 2015
commit 71eef54b05fd4f949a4a5d0d65072887174c4666
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,13 @@ package org.apache.spark.sql.execution
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.rdd.{RDD, ShuffledRDD}
import org.apache.spark.shuffle.sort.SortShuffleManager
import org.apache.spark.sql.Row
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.CatalystTypeConverters
import org.apache.spark.sql.catalyst.errors._
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.physical._
import org.apache.spark.sql.Row
import org.apache.spark.sql.types.StructType
import org.apache.spark.util.collection.ExternalSorterNoAgg
import org.apache.spark.util.collection.unsafe.sort.PrefixComparator
import org.apache.spark.util.{CompletionIterator, MutablePair}
Expand Down Expand Up @@ -220,139 +220,6 @@ case class TakeOrderedAndProject(
override def outputOrdering: Seq[SortOrder] = sortOrder
}

/**
* :: DeveloperApi ::
* Performs a sort on-heap.
* @param global when true performs a global sort of all partitions by shuffling the data first
* if necessary.
*/
@DeveloperApi
case class Sort(
sortOrder: Seq[SortOrder],
global: Boolean,
child: SparkPlan)
extends UnaryNode {
override def requiredChildDistribution: Seq[Distribution] =
if (global) OrderedDistribution(sortOrder) :: Nil else UnspecifiedDistribution :: Nil

protected override def doExecute(): RDD[InternalRow] = attachTree(this, "sort") {
child.execute().mapPartitions( { iterator =>
val ordering = newOrdering(sortOrder, child.output)
iterator.map(_.copy()).toArray.sorted(ordering).iterator
}, preservesPartitioning = true)
}

override def output: Seq[Attribute] = child.output

override def outputOrdering: Seq[SortOrder] = sortOrder
}

/**
* :: DeveloperApi ::
* Performs a sort, spilling to disk as needed.
* @param global when true performs a global sort of all partitions by shuffling the data first
* if necessary.
*/
@DeveloperApi
case class ExternalSort(
sortOrder: Seq[SortOrder],
global: Boolean,
child: SparkPlan)
extends UnaryNode {

override def requiredChildDistribution: Seq[Distribution] =
if (global) OrderedDistribution(sortOrder) :: Nil else UnspecifiedDistribution :: Nil

protected override def doExecute(): RDD[InternalRow] = attachTree(this, "sort") {
child.execute().mapPartitions( { iterator =>
val ordering = newOrdering(sortOrder, child.output)
val sorter =
new ExternalSorterNoAgg[InternalRow, Null, InternalRow](ordering = Some(ordering))
sorter.insertAll(iterator.map(r => (r.copy, null)))
val baseIterator = sorter.iterator.map(_._1)
// TODO(marmbrus): The complex type signature below thwarts inference for no reason.
CompletionIterator[InternalRow, Iterator[InternalRow]](baseIterator, sorter.stop())
}, preservesPartitioning = true)
}

override def output: Seq[Attribute] = child.output

override def outputOrdering: Seq[SortOrder] = sortOrder
}

/**
* :: DeveloperApi ::
* Optimized version of [[ExternalSort]] that operates on binary data (implemented as part of
* Project Tungsten).
*
* @param global when true performs a global sort of all partitions by shuffling the data first
* if necessary.
* @param testSpillFrequency Method for configuring periodic spilling in unit tests. If set, will
* spill every `frequency` records.
*/
@DeveloperApi
case class UnsafeExternalSort(
sortOrder: Seq[SortOrder],
global: Boolean,
child: SparkPlan,
testSpillFrequency: Int = 0)
extends UnaryNode {

private[this] val schema: StructType = child.schema

override def requiredChildDistribution: Seq[Distribution] =
if (global) OrderedDistribution(sortOrder) :: Nil else UnspecifiedDistribution :: Nil

protected override def doExecute(): RDD[InternalRow] = attachTree(this, "sort") {
assert(codegenEnabled, "UnsafeExternalSort requires code generation to be enabled")
def doSort(iterator: Iterator[InternalRow]): Iterator[InternalRow] = {
val ordering = newOrdering(sortOrder, child.output)
val boundSortExpression = BindReferences.bindReference(sortOrder.head, child.output)
// Hack until we generate separate comparator implementations for ascending vs. descending
// (or choose to codegen them):
val prefixComparator = {
val comp = SortPrefixUtils.getPrefixComparator(boundSortExpression)
if (sortOrder.head.direction == Descending) {
new PrefixComparator {
override def compare(p1: Long, p2: Long): Int = -1 * comp.compare(p1, p2)
}
} else {
comp
}
}
val prefixComputer = {
val prefixComputer = SortPrefixUtils.getPrefixComputer(boundSortExpression)
new UnsafeExternalRowSorter.PrefixComputer {
override def computePrefix(row: InternalRow): Long = prefixComputer(row)
}
}
val sorter = new UnsafeExternalRowSorter(schema, ordering, prefixComparator, prefixComputer)
if (testSpillFrequency > 0) {
sorter.setTestSpillFrequency(testSpillFrequency)
}
sorter.sort(iterator)
}
child.execute().mapPartitions(doSort, preservesPartitioning = true)
}

override def output: Seq[Attribute] = child.output

override def outputOrdering: Seq[SortOrder] = sortOrder

override def outputsUnsafeRows: Boolean = true
}

@DeveloperApi
object UnsafeExternalSort {
/**
* Return true if UnsafeExternalSort can sort rows with the given schema, false otherwise.
*/
def supportsSchema(schema: StructType): Boolean = {
UnsafeExternalRowSorter.supportsSchema(schema)
}
}


/**
* :: DeveloperApi ::
* Return a new RDD that has exactly `numPartitions` partitions.
Expand Down
36 changes: 21 additions & 15 deletions sql/core/src/main/scala/org/apache/spark/sql/execution/sort.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,29 +17,32 @@

package org.apache.spark.sql.execution

import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.errors._
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.physical.{UnspecifiedDistribution, OrderedDistribution, Distribution}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.plans.physical.{Distribution, OrderedDistribution, UnspecifiedDistribution}
import org.apache.spark.sql.types.StructType
import org.apache.spark.util.CompletionIterator
import org.apache.spark.util.collection.ExternalSorter
import org.apache.spark.util.collection.ExternalSorterNoAgg

////////////////////////////////////////////////////////////////////////////////////////////////////
// This file defines various sort operators.
////////////////////////////////////////////////////////////////////////////////////////////////////


/**
* Performs a sort on-heap.
* @param global when true performs a global sort of all partitions by shuffling the data first
* if necessary.
*/
* :: DeveloperApi ::
* Performs a sort on-heap.
* @param global when true performs a global sort of all partitions by shuffling the data first
* if necessary.
*/
@DeveloperApi
case class Sort(
sortOrder: Seq[SortOrder],
global: Boolean,
child: SparkPlan)
sortOrder: Seq[SortOrder],
global: Boolean,
child: SparkPlan)
extends UnaryNode {
override def requiredChildDistribution: Seq[Distribution] =
if (global) OrderedDistribution(sortOrder) :: Nil else UnspecifiedDistribution :: Nil
Expand All @@ -57,14 +60,16 @@ case class Sort(
}

/**
* :: DeveloperApi ::
* Performs a sort, spilling to disk as needed.
* @param global when true performs a global sort of all partitions by shuffling the data first
* if necessary.
*/
@DeveloperApi
case class ExternalSort(
sortOrder: Seq[SortOrder],
global: Boolean,
child: SparkPlan)
sortOrder: Seq[SortOrder],
global: Boolean,
child: SparkPlan)
extends UnaryNode {

override def requiredChildDistribution: Seq[Distribution] =
Expand All @@ -73,8 +78,9 @@ case class ExternalSort(
protected override def doExecute(): RDD[InternalRow] = attachTree(this, "sort") {
child.execute().mapPartitions( { iterator =>
val ordering = newOrdering(sortOrder, child.output)
val sorter = new ExternalSorter[InternalRow, Null, InternalRow](ordering = Some(ordering))
sorter.insertAll(iterator.map(r => (r.copy(), null)))
val sorter =
new ExternalSorterNoAgg[InternalRow, Null, InternalRow](ordering = Some(ordering))
sorter.insertAll(iterator.map(r => (r.copy, null)))
val baseIterator = sorter.iterator.map(_._1)
// TODO(marmbrus): The complex type signature below thwarts inference for no reason.
CompletionIterator[InternalRow, Iterator[InternalRow]](baseIterator, sorter.stop())
Expand Down
You are viewing a condensed version of this merge commit. You can view the full changes here.