Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
b69f201
Added tunable parallelism to the pyspark implementation of one vs. re…
ajaysaini725 Jun 12, 2017
e750d3e
Fixed python style.
ajaysaini725 Jun 12, 2017
81d458b
Added functionality for tuning parellelism in the Scala implementatio…
ajaysaini725 Jun 13, 2017
2133378
Fixed code according to comments. Added both annotations and unit tes…
ajaysaini725 Jun 13, 2017
c59b1d8
Modified parallel one vs rest to use futures.
ajaysaini725 Jun 22, 2017
5f635a2
Put the parallelism parameter as well as the function for getting an …
ajaysaini725 Jun 23, 2017
4431ffc
Responded to pull request comments.
ajaysaini725 Jun 23, 2017
a841b3e
Made changes based on pull request comments.
ajaysaini725 Jul 6, 2017
a95a8af
Fixed based on pull request comments
ajaysaini725 Jul 14, 2017
d45bc23
Fixed based on comments
ajaysaini725 Jul 18, 2017
30ac62d
Reverting merge and adding change that would fix merge conflict (maki…
ajaysaini725 Jul 19, 2017
cc634d2
Merge branch 'master' into spark-21027
ajaysaini725 Jul 19, 2017
ce14172
Style fix with docstring
ajaysaini725 Jul 20, 2017
1c9de16
Fixed based on comments.
ajaysaini725 Jul 27, 2017
9f34404
Fixed style issue.
ajaysaini725 Jul 27, 2017
585a3f8
Fixed merge conflict
ajaysaini725 Aug 12, 2017
f65381a
Fixed remaining part of merge conflict.
ajaysaini725 Aug 23, 2017
2a335fe
Fixed style problem
ajaysaini725 Aug 23, 2017
049f371
Merge branch 'master' into spark-21027
WeichenXu123 Sep 2, 2017
ddc2ff4
address review feedback issues
WeichenXu123 Sep 3, 2017
fc6fd5e
update migration guide
WeichenXu123 Sep 3, 2017
7d0849e
update desc
WeichenXu123 Sep 6, 2017
edcf85c
fix style
WeichenXu123 Sep 6, 2017
7a1d404
merge master & resolve conflicts
WeichenXu123 Sep 6, 2017
c24d4e2
update out-of-date shared.py
WeichenXu123 Sep 12, 2017
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Added functionality for tuning parellelism in the Scala implementatio…
…n of the one vs. rest algorithm.
  • Loading branch information
ajaysaini725 committed Jun 13, 2017
commit 81d458be99cf4f195b761eaa9bcb48ea086cdf61
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ import java.util.{List => JList}
import java.util.UUID

import scala.collection.JavaConverters._
import scala.collection.parallel.ForkJoinTaskSupport
import scala.concurrent.forkjoin.ForkJoinPool
import scala.language.existentials

import org.apache.hadoop.fs.Path
Expand Down Expand Up @@ -67,7 +69,7 @@ private[ml] trait OneVsRestParams extends PredictorParams with ClassifierTypeTra
def getClassifier: ClassifierType = $(classifier)

val parallelism = new IntParam(this, "parallelism",
"parallelism parameter for tuning amount of parallelism", ParamValidators.gt(1))
"parallelism parameter for tuning amount of parallelism", ParamValidators.gtEq(1))

/** @group getParam */
def getParallelism: Int = $(parallelism)
Expand Down Expand Up @@ -279,6 +281,10 @@ final class OneVsRest @Since("1.4.0") (
@Since("1.4.0") override val uid: String)
extends Estimator[OneVsRestModel] with OneVsRestParams with MLWritable {

setDefault(
parallelism -> 4
)

@Since("1.4.0")
def this() = this(Identifiable.randomUID("oneVsRest"))

Expand Down Expand Up @@ -337,8 +343,13 @@ final class OneVsRest @Since("1.4.0") (
multiclassLabeled.persist(StorageLevel.MEMORY_AND_DISK)
}

val iters = Range(0, numClasses).par
iters.tasksupport = new ForkJoinTaskSupport(
new ForkJoinPool(getParallelism)
)

// create k columns, one for each binary classifier.
val models = Range(0, numClasses).par.map { index =>
val models = iters.map { index =>
// generate new label metadata for the binary problem.
val newLabelMeta = BinaryAttribute.defaultAttr.withName("label").toMetadata()
val labelColName = "mc2b$" + index
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,40 @@ class OneVsRestSuite extends SparkFunSuite with MLlibTestSparkContext with Defau
assert(expectedMetrics.confusionMatrix ~== ovaMetrics.confusionMatrix absTol 400)
}

test("one-vs-rest: tuning parallelism produces correct output") {
val numClasses = 3
val ova = new OneVsRest()
.setClassifier(new LogisticRegression)
.setParallelism(8)
assert(ova.getLabelCol === "label")
assert(ova.getPredictionCol === "prediction")
val ovaModel = ova.fit(dataset)

MLTestingUtils.checkCopyAndUids(ova, ovaModel)

assert(ovaModel.models.length === numClasses)
val transformedDataset = ovaModel.transform(dataset)

// check for label metadata in prediction col
val predictionColSchema = transformedDataset.schema(ovaModel.getPredictionCol)
assert(MetadataUtils.getNumClasses(predictionColSchema) === Some(3))

val ovaResults = transformedDataset.select("prediction", "label").rdd.map {
row => (row.getDouble(0), row.getDouble(1))
}

val lr = new LogisticRegressionWithLBFGS().setIntercept(true).setNumClasses(numClasses)
lr.optimizer.setRegParam(0.1).setNumIterations(100)

val model = lr.run(rdd.map(OldLabeledPoint.fromML))
val results = model.predict(rdd.map(p => OldVectors.fromML(p.features))).zip(rdd.map(_.label))
// determine the #confusion matrix in each class.
// bound how much error we allow compared to multinomial logistic regression.
val expectedMetrics = new MulticlassMetrics(results)
val ovaMetrics = new MulticlassMetrics(ovaResults)
assert(expectedMetrics.confusionMatrix ~== ovaMetrics.confusionMatrix absTol 400)
}

test("one-vs-rest: pass label metadata correctly during train") {
val numClasses = 3
val ova = new OneVsRest()
Expand Down
4 changes: 2 additions & 2 deletions python/pyspark/ml/classification.py
Original file line number Diff line number Diff line change
Expand Up @@ -1517,13 +1517,13 @@ class OneVsRest(Estimator, OneVsRestParams, MLReadable, MLWritable):

@keyword_only
def __init__(self, featuresCol="features", labelCol="label", predictionCol="prediction",
classifier=None, parallelism=8):
classifier=None, parallelism=4):
"""
__init__(self, featuresCol="features", labelCol="label", predictionCol="prediction", \
classifier=None)
"""
super(OneVsRest, self).__init__()
self._setDefault(parallelism=8)
self._setDefault(parallelism=4)
kwargs = self._input_kwargs
self._set(**kwargs)

Expand Down