Skip to content
Closed
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions core/src/main/scala/org/apache/spark/Partitioner.scala
Original file line number Diff line number Diff line change
Expand Up @@ -89,12 +89,14 @@ class HashPartitioner(partitions: Int) extends Partitioner {
* A [[org.apache.spark.Partitioner]] that partitions sortable records by range into roughly
* equal ranges. The ranges are determined by sampling the content of the RDD passed in.
*/
class RangePartitioner[K <% Ordered[K]: ClassTag, V](
class RangePartitioner[K : Ordering : ClassTag, V](
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can the user still pass in an Ordering if they want a specific Ordering?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, when de-sugared there is an implicit Ordering -- that's why Michael can recover and bind it explicitly at line 98. And yes, it can be overridden in the normal way.

partitions: Int,
@transient rdd: RDD[_ <: Product2[K,V]],
private val ascending: Boolean = true)
extends Partitioner {

private val ordering = implicitly[Ordering[K]]

// An array of upper bounds for the first (partitions - 1) partitions
private val rangeBounds: Array[K] = {
if (partitions == 1) {
Expand All @@ -103,7 +105,7 @@ class RangePartitioner[K <% Ordered[K]: ClassTag, V](
val rddSize = rdd.count()
val maxSampleSize = partitions * 20.0
val frac = math.min(maxSampleSize / math.max(rddSize, 1), 1.0)
val rddSample = rdd.sample(false, frac, 1).map(_._1).collect().sortWith(_ < _)
val rddSample = rdd.sample(false, frac, 1).map(_._1).collect().sorted
if (rddSample.length == 0) {
Array()
} else {
Expand All @@ -126,7 +128,7 @@ class RangePartitioner[K <% Ordered[K]: ClassTag, V](
var partition = 0
if (rangeBounds.length < 1000) {
// If we have less than 100 partitions naive search
while (partition < rangeBounds.length && k > rangeBounds(partition)) {
while (partition < rangeBounds.length && ordering.gt(k, rangeBounds(partition))) {
partition += 1
}
} else {
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1267,7 +1267,7 @@ object SparkContext extends Logging {
rdd: RDD[(K, V)]) =
new SequenceFileRDDFunctions(rdd)

implicit def rddToOrderedRDDFunctions[K <% Ordered[K]: ClassTag, V: ClassTag](
implicit def rddToOrderedRDDFunctions[K : Ordering : ClassTag, V: ClassTag](
rdd: RDD[(K, V)]) =
new OrderedRDDFunctions[K, V, (K, V)](rdd)

Expand Down
26 changes: 21 additions & 5 deletions core/src/main/scala/org/apache/spark/rdd/OrderedRDDFunctions.scala
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,31 @@ import org.apache.spark.{Logging, RangePartitioner}
/**
* Extra functions available on RDDs of (key, value) pairs where the key is sortable through
* an implicit conversion. Import `org.apache.spark.SparkContext._` at the top of your program to
* use these functions. They will work with any key type that has a `scala.math.Ordered`
* implementation.
* use these functions. They will work with any key type `K` that has an implicit `Ordering[K]` in
* scope. Ordering objects already exist for all of the standard primitive types. Users can also
* define their own orderings for custom types, or to override the default ordering. The implicit
* ordering that is in the closest scope will be used.
*
* {{{
* import org.apache.spark.SparkContext._
*
* val rdd: RDD[(String, Int)] = ...
* implicit val caseInsensitiveOrdering = new Ordering[String] {
* override def compare(a: String, b: String) = a.toLowerCase.compare(b.toLowerCase)
* }
*
* // Sort by key, using the above case insensitive ordering.
* rdd.sortByKey
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

a nit: put parenthesis around sortByKey

* }}}
*/
class OrderedRDDFunctions[K <% Ordered[K]: ClassTag,
class OrderedRDDFunctions[K : Ordering : ClassTag,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@marmbrus Do you mind updating the scaladoc above since now users can pass their own orderings in addition to an Ordered type? It might be nice to just give a one-line example of how they can define a custom ordering as well.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done.

V: ClassTag,
P <: Product2[K, V] : ClassTag](
self: RDD[P])
extends Logging with Serializable {

private val ordering = implicitly[Ordering[K]]

/**
* Sort the RDD by key, so that each partition contains a sorted range of the elements. Calling
* `collect` or `save` on the resulting RDD will return or output an ordered list of records
Expand All @@ -45,9 +61,9 @@ class OrderedRDDFunctions[K <% Ordered[K]: ClassTag,
shuffled.mapPartitions(iter => {
val buf = iter.toArray
if (ascending) {
buf.sortWith((x, y) => x._1 < y._1).iterator
buf.sortWith((x, y) => ordering.lt(x._1, y._1)).iterator
} else {
buf.sortWith((x, y) => x._1 > y._1).iterator
buf.sortWith((x, y) => ordering.gt(x._1, y._1)).iterator
}
}, preservesPartitioning = true)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import scala.Array
import scala.reflect._

private[spark] object CollectionsUtils {
def makeBinarySearch[K <% Ordered[K] : ClassTag] : (Array[K], K) => Int = {
def makeBinarySearch[K : Ordering : ClassTag] : (Array[K], K) => Int = {
classTag[K] match {
case ClassTag.Float =>
(l, x) => util.Arrays.binarySearch(l.asInstanceOf[Array[Float]], x.asInstanceOf[Float])
Expand Down