Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
added max and min to StatCounter output, updated doc
  • Loading branch information
dwmclary committed Mar 15, 2014
commit 1a97558b7e8cdb6bb2dc06a815943192c3285acb
12 changes: 12 additions & 0 deletions core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
Original file line number Diff line number Diff line change
Expand Up @@ -477,11 +477,23 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
new java.util.ArrayList(arr)
}

/**
* Returns the maximum element from this RDD as defined by the specified
* Comparator[T].
* @params comp the comparator that defines ordering
* @return the maximum of the RDD
* */
def max(comp: Comparator[T]): T = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add doc comments to these and the Scala versions

import scala.collection.JavaConversions._
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think you need to import this if you're going to call Ordering.comparatorToOrdering directly -- was it necessary? It was in some other methods because they used other conversions

rdd.max()(Ordering.comparatorToOrdering(comp))
}

/**
* Returns the minimum element from this RDD as defined by the specified
* Comparator[T].
* @params comp the comparator that defines ordering
* @return the minimum of the RDD
* */
def min(comp: Comparator[T]): T = {
import scala.collection.JavaConversions._
rdd.min()(Ordering.comparatorToOrdering(comp))
Expand Down
12 changes: 10 additions & 2 deletions core/src/main/scala/org/apache/spark/rdd/RDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -958,9 +958,17 @@ abstract class RDD[T: ClassTag](
*/
def takeOrdered(num: Int)(implicit ord: Ordering[T]): Array[T] = top(num)(ord.reverse)

def max()(implicit ord: Ordering[T]):T = this.reduce{(x,y) => ord.max(x,y)}
/**
* Returns the max of this RDD as defined by the implicit Ordering[T].
* @return the maximum element of the RDD
* */
def max()(implicit ord: Ordering[T]):T = this.reduce(ord.max)

def min()(implicit ord: Ordering[T]):T = this.reduce{(x,y) => ord.min(x,y)}
/**
* Returns the min of this RDD as defined by the implicit Ordering[T].
* @return the minimum element of the RDD
* */
def min()(implicit ord: Ordering[T]):T = this.reduce(ord.min)

/**
* Save this RDD as a text file, using string representations of elements.
Expand Down
6 changes: 3 additions & 3 deletions core/src/main/scala/org/apache/spark/util/StatCounter.scala
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@ class StatCounter(values: TraversableOnce[Double]) extends Serializable {
private var n: Long = 0 // Running count of our values
private var mu: Double = 0 // Running mean of our values
private var m2: Double = 0 // Running variance numerator (sum of (x - mean)^2)
private var max_v: Double = 0 // Running max of our values
private var min_v: Double = 0 // Running min of our values
private var max_v: Double = Double(-Infinity) // Running max of our values
private var min_v: Double = Double(Infinity) // Running min of our values

merge(values)

Expand Down Expand Up @@ -135,7 +135,7 @@ class StatCounter(values: TraversableOnce[Double]) extends Serializable {
def sampleStdev: Double = math.sqrt(sampleVariance)

override def toString: String = {
"(count: %d, mean: %f, stdev: %f)".format(count, mean, stdev)
"(count: %d, mean: %f, stdev: %f, max: %f, min: $f)".format(count, mean, stdev, max, min)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be %f, not $f

}
}

Expand Down
4 changes: 2 additions & 2 deletions core/src/test/scala/org/apache/spark/PartitioningSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -171,8 +171,8 @@ class PartitioningSuite extends FunSuite with SharedSparkContext with PrivateMet
assert(abs(6.0/2 - rdd.mean) < 0.01)
assert(abs(1.0 - rdd.variance) < 0.01)
assert(abs(1.0 - rdd.stdev) < 0.01)
assert(abs(4.0 - stats.max) === 0)
assert(abs(-1.0 - stats.max) === 0)
assert(stats.max === 4.0)
assert(stats.min === -1.0)

// Add other tests here for classes that should be able to handle empty partitions correctly
}
Expand Down
4 changes: 2 additions & 2 deletions python/pyspark/rdd.py
Original file line number Diff line number Diff line change
Expand Up @@ -543,7 +543,7 @@ def max(self):
>>> sc.parallelize([1.0, 5.0, 43.0, 10.0]).max()
43.0
"""
return self.stats().max()
return self.reduce(max)

def min(self):
"""
Expand All @@ -552,7 +552,7 @@ def min(self):
>>> sc.parallelize([1.0, 5.0, 43.0, 10.0]).min()
1.0
"""
return self.stats().min()
return self.reduce(min)

def sum(self):
"""
Expand Down