Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Consolidate things in mlutils
  • Loading branch information
holdenk committed Apr 9, 2014
commit 91eae64dc0731c845b03f17d052e26214fd9d28f
54 changes: 0 additions & 54 deletions core/src/main/scala/org/apache/spark/rdd/FoldedRDD.scala

This file was deleted.

11 changes: 0 additions & 11 deletions core/src/main/scala/org/apache/spark/rdd/RDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -334,17 +334,6 @@ abstract class RDD[T: ClassTag](
}.toArray
}

/**
* Return a k element list of pairs of RDDs with the first element of each pair
* containing a unique 1/Kth of the data and the second element contain the composite of that.
*/
def kFoldRdds(folds: Int, seed: Int): List[Pair[RDD[T], RDD[T]]] = {
1.to(folds).map(fold => ((
new FoldedRDD(this, fold, folds, seed),
new CompositeFoldedRDD(this, fold, folds, seed)
))).toList
}

def takeSample(withReplacement: Boolean, num: Int, seed: Int): Array[T] = {
var fraction = 0.0
var total = 0
Expand Down
31 changes: 0 additions & 31 deletions core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -513,37 +513,6 @@ class RDDSuite extends FunSuite with SharedSparkContext {
}
}

test("FoldedRDD") {
val data = sc.parallelize(1 to 100, 2)
val lowerFoldedRdd = new FoldedRDD(data, 1, 2, 1)
val upperFoldedRdd = new FoldedRDD(data, 2, 2, 1)
val lowerCompositeFoldedRdd = new CompositeFoldedRDD(data, 1, 2, 1)
assert(lowerFoldedRdd.collect().sorted.size == 50)
assert(lowerCompositeFoldedRdd.collect().sorted.size == 50)
assert(lowerFoldedRdd.subtract(lowerCompositeFoldedRdd).collect().sorted ===
lowerFoldedRdd.collect().sorted)
assert(upperFoldedRdd.collect().sorted.size == 50)
}

test("kfoldRdd") {
val data = sc.parallelize(1 to 100, 2)
val collectedData = data.collect().sorted
for (folds <- 2 to 10) {
for (seed <- 1 to 5) {
val foldedRdds = data.kFoldRdds(folds, seed)
assert(foldedRdds.size === folds)
foldedRdds.map{case (test, train) =>
val result = test.union(train).collect().sorted
assert(result === collectedData,
"Each training+test set combined contains all of the data")
}
// K fold cross validation should only have each element in the test set exactly once
assert(foldedRdds.map(_._1).reduce((x,y) => x.union(y)).collect().sorted ===
data.collect().sorted)
}
}
}

test("runJob on an invalid partition") {
intercept[IllegalArgumentException] {
sc.runJob(sc.parallelize(1 to 10, 2), {iter: Iterator[Int] => iter.size}, Seq(0, 1, 2), false)
Expand Down
32 changes: 28 additions & 4 deletions mllib/src/main/scala/org/apache/spark/mllib/util/MLUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,15 @@ import breeze.linalg.{Vector => BV, DenseVector => BDV, SparseVector => BSV,

import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import scala.reflect._

import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.rdd.PartitionwiseSampledRDD
import org.apache.spark.SparkContext._
import org.apache.spark.util.random.BernoulliSampler

import org.jblas.DoubleMatrix
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please move this line to the block of breeze imports. And merge spark and spark.mllib imports in a single block.

import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.mllib.linalg.{Vector, Vectors}
import org.apache.spark.mllib.regression.RegressionModel
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove unused imports

Expand Down Expand Up @@ -176,6 +185,21 @@ object MLUtils {
(a-b)*(a-b)
}

/**
* Return a k element list of pairs of RDDs with the first element of each pair
* containing a unique 1/Kth of the data and the second element contain the composite of that.
*/
def kFoldRdds[T : ClassTag](rdd: RDD[T], folds: Int, seed: Int): List[Pair[RDD[T], RDD[T]]] = {
val foldsF = folds.toFloat
1.to(folds).map(fold => ((
new PartitionwiseSampledRDD(rdd, new BernoulliSampler[T]((fold-1)/foldsF,fold/foldsF, false),
seed),
new PartitionwiseSampledRDD(rdd, new BernoulliSampler[T]((fold-1)/foldsF,fold/foldsF, true),
seed)
))).toList
}


/**
* Function to perform cross validation on a single learner.
*
Expand All @@ -192,14 +216,14 @@ object MLUtils {
if (folds <= 1) {
throw new IllegalArgumentException("Cross validation requires more than one fold")
}
val rdds = data.kFoldRdds(folds, seed)
val rdds = kFoldRdds(data, folds, seed)
val errorRates = rdds.map{case (testData, trainingData) =>
val model = learner(trainingData)
val predictions = model.predict(testData.map(_.features))
val errors = predictions.zip(testData.map(_.label)).map{case (x,y) => errorFunction(x,y)}
val predictions = testData.map(data => (data.label, model.predict(data.features)))
val errors = predictions.map{case (x, y) => errorFunction(x, y)}
errors.sum()
}
val averageError = errorRates.sum / data.count
val averageError = errorRates.sum / data.count.toFloat
averageError
}

Expand Down
41 changes: 28 additions & 13 deletions mllib/src/test/scala/org/apache/spark/mllib/util/MLUtilsSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@ class MLUtilsSuite extends FunSuite with LocalSparkContext {
System.clearProperty("spark.driver.port")
}


test("epsilon computation") {
assert(1.0 + EPSILON > 1.0, s"EPSILON is too small: $EPSILON.")
assert(1.0 + EPSILON / 2.0 === 1.0, s"EPSILON is too big: $EPSILON.")
Expand Down Expand Up @@ -137,20 +136,11 @@ class MLUtilsSuite extends FunSuite with LocalSparkContext {
new LinearRegressionModel(Array(1.0), 0)
}

test("Test cross validation with a terrible learner") {
val data = sc.parallelize(1.to(100).zip(1.to(100))).map(
x => LabeledPoint(x._1, Array(x._2)))
val expectedError = 1.to(100).map(x => x*x).sum / 100.0
for (seed <- 1 to 5) {
for (folds <- 2 to 5) {
val avgError = MLUtils.crossValidate(data, folds, seed, terribleLearner)
avgError should equal (expectedError)
}
}
}
test("Test cross validation with a reasonable learner") {
val data = sc.parallelize(1.to(100).zip(1.to(100))).map(
x => LabeledPoint(x._1, Array(x._2)))
val features = data.map(_.features)
val labels = data.map(_.label)
for (seed <- 1 to 5) {
for (folds <- 2 to 5) {
val avgError = MLUtils.crossValidate(data, folds, seed, exactLearner)
Expand All @@ -163,8 +153,33 @@ class MLUtilsSuite extends FunSuite with LocalSparkContext {
val data = sc.parallelize(1.to(100).zip(1.to(100))).map(
x => LabeledPoint(x._1, Array(x._2)))
val thrown = intercept[java.lang.IllegalArgumentException] {
val avgError = MLUtils.crossValidate(data, 1, 1, terribleLearner)
val avgError = MLUtils.crossValidate(data, 1, 1, exactLearner)
}
assert(thrown.getClass === classOf[IllegalArgumentException])
}

test("kfoldRdd") {
val data = sc.parallelize(1 to 100, 2)
val collectedData = data.collect().sorted
val twoFoldedRdd = MLUtils.kFoldRdds(data, 2, 1)
assert(twoFoldedRdd(0)._1.collect().sorted === twoFoldedRdd(1)._2.collect().sorted)
assert(twoFoldedRdd(0)._2.collect().sorted === twoFoldedRdd(1)._1.collect().sorted)
for (folds <- 2 to 10) {
for (seed <- 1 to 5) {
val foldedRdds = MLUtils.kFoldRdds(data, folds, seed)
assert(foldedRdds.size === folds)
foldedRdds.map{case (test, train) =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

put spaces around {

val result = test.union(train).collect().sorted
assert(test.collect().size > 0, "Non empty test data")
assert(train.collect().size > 0, "Non empty training data")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

minor stuff: The second argument of assert should be an error message. If a test fails with "non-empty training data", I would assume the assertion is that the training data is empty.

assert(result === collectedData,
"Each training+test set combined contains all of the data")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto. "Each training+validation set combined should contain all of the data." reads better.

}
// K fold cross validation should only have each element in the test set exactly once
assert(foldedRdds.map(_._1).reduce((x,y) => x.union(y)).collect().sorted ===
data.collect().sorted)
}
}
}

}