Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
b69f201
Added tunable parallelism to the pyspark implementation of one vs. re…
ajaysaini725 Jun 12, 2017
e750d3e
Fixed python style.
ajaysaini725 Jun 12, 2017
81d458b
Added functionality for tuning parellelism in the Scala implementatio…
ajaysaini725 Jun 13, 2017
2133378
Fixed code according to comments. Added both annotations and unit tes…
ajaysaini725 Jun 13, 2017
c59b1d8
Modified parallel one vs rest to use futures.
ajaysaini725 Jun 22, 2017
5f635a2
Put the parallelism parameter as well as the function for getting an …
ajaysaini725 Jun 23, 2017
4431ffc
Responded to pull request comments.
ajaysaini725 Jun 23, 2017
a841b3e
Made changes based on pull request comments.
ajaysaini725 Jul 6, 2017
a95a8af
Fixed based on pull request comments
ajaysaini725 Jul 14, 2017
d45bc23
Fixed based on comments
ajaysaini725 Jul 18, 2017
30ac62d
Reverting merge and adding change that would fix merge conflict (maki…
ajaysaini725 Jul 19, 2017
cc634d2
Merge branch 'master' into spark-21027
ajaysaini725 Jul 19, 2017
ce14172
Style fix with docstring
ajaysaini725 Jul 20, 2017
1c9de16
Fixed based on comments.
ajaysaini725 Jul 27, 2017
9f34404
Fixed style issue.
ajaysaini725 Jul 27, 2017
585a3f8
Fixed merge conflict
ajaysaini725 Aug 12, 2017
f65381a
Fixed remaining part of merge conflict.
ajaysaini725 Aug 23, 2017
2a335fe
Fixed style problem
ajaysaini725 Aug 23, 2017
049f371
Merge branch 'master' into spark-21027
WeichenXu123 Sep 2, 2017
ddc2ff4
address review feedback issues
WeichenXu123 Sep 3, 2017
fc6fd5e
update migration guide
WeichenXu123 Sep 3, 2017
7d0849e
update desc
WeichenXu123 Sep 6, 2017
edcf85c
fix style
WeichenXu123 Sep 6, 2017
7a1d404
merge master & resolve conflicts
WeichenXu123 Sep 6, 2017
c24d4e2
update out-of-date shared.py
WeichenXu123 Sep 12, 2017
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Fixed code according to comments. Added both annotations and unit tes…
…ts for testing that parallelism doesn't affect the output.
  • Loading branch information
ajaysaini725 committed Jun 13, 2017
commit 213337882a40c63c3a3ef5741c17a6eebd63df0b
Original file line number Diff line number Diff line change
Expand Up @@ -67,12 +67,6 @@ private[ml] trait OneVsRestParams extends PredictorParams with ClassifierTypeTra

/** @group getParam */
def getClassifier: ClassifierType = $(classifier)

val parallelism = new IntParam(this, "parallelism",
"parallelism parameter for tuning amount of parallelism", ParamValidators.gtEq(1))

/** @group getParam */
def getParallelism: Int = $(parallelism)
}

private[ml] object OneVsRestParams extends ClassifierTypeTrait {
Expand Down Expand Up @@ -281,21 +275,34 @@ final class OneVsRest @Since("1.4.0") (
@Since("1.4.0") override val uid: String)
extends Estimator[OneVsRestModel] with OneVsRestParams with MLWritable {

/**
* param for the number of processes to use when running parallel one vs. rest
* The implementation of parallel one vs. rest runs the classification for
* each class in a separate process.
* @group param
*/
@Since("2.3.0")
val parallelism = new IntParam(this, "parallelism",
"the number of processes to use when running parallel one vs. rest", ParamValidators.gtEq(1))

setDefault(
parallelism -> 4
)

@Since("1.4.0")
def this() = this(Identifiable.randomUID("oneVsRest"))

/** @group getParam */
def getParallelism: Int = $(parallelism)

/** @group setParam */
@Since("1.4.0")
def setClassifier(value: Classifier[_, _, _]): this.type = {
set(classifier, value.asInstanceOf[ClassifierType])
}

/** @group setParam */
@Since("1.4.0")
@Since("2.3.0")
def setParallelism(value: Int): this.type = {
set(parallelism, value)
}
Expand Down Expand Up @@ -345,7 +352,7 @@ final class OneVsRest @Since("1.4.0") (

val iters = Range(0, numClasses).par
iters.tasksupport = new ForkJoinTaskSupport(
new ForkJoinPool(getParallelism)
new ForkJoinPool(Math.min(getParallelism, numClasses))
)

// create k columns, one for each binary classifier.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,38 +101,35 @@ class OneVsRestSuite extends SparkFunSuite with MLlibTestSparkContext with Defau
assert(expectedMetrics.confusionMatrix ~== ovaMetrics.confusionMatrix absTol 400)
}

test("one-vs-rest: tuning parallelism produces correct output") {
test("one-vs-rest: tuning parallelism does not change output") {
val numClasses = 3
val ova = new OneVsRest()
val ovaPar2 = new OneVsRest()
.setClassifier(new LogisticRegression)
.setParallelism(8)
assert(ova.getLabelCol === "label")
assert(ova.getPredictionCol === "prediction")
val ovaModel = ova.fit(dataset)
.setParallelism(2)

MLTestingUtils.checkCopyAndUids(ova, ovaModel)

assert(ovaModel.models.length === numClasses)
val transformedDataset = ovaModel.transform(dataset)
val ovaModelPar2 = ovaPar2.fit(dataset)

// check for label metadata in prediction col
val predictionColSchema = transformedDataset.schema(ovaModel.getPredictionCol)
assert(MetadataUtils.getNumClasses(predictionColSchema) === Some(3))
val transformedDatasetPar2 = ovaModelPar2.transform(dataset)

val ovaResults = transformedDataset.select("prediction", "label").rdd.map {
val ovaResultsPar2 = transformedDatasetPar2.select("prediction", "label").rdd.map {
row => (row.getDouble(0), row.getDouble(1))
}

val lr = new LogisticRegressionWithLBFGS().setIntercept(true).setNumClasses(numClasses)
lr.optimizer.setRegParam(0.1).setNumIterations(100)
val ovaPar4 = new OneVsRest()
.setClassifier(new LogisticRegression)
.setParallelism(4)

val model = lr.run(rdd.map(OldLabeledPoint.fromML))
val results = model.predict(rdd.map(p => OldVectors.fromML(p.features))).zip(rdd.map(_.label))
// determine the #confusion matrix in each class.
// bound how much error we allow compared to multinomial logistic regression.
val expectedMetrics = new MulticlassMetrics(results)
val ovaMetrics = new MulticlassMetrics(ovaResults)
assert(expectedMetrics.confusionMatrix ~== ovaMetrics.confusionMatrix absTol 400)
val ovaModelPar4 = ovaPar4.fit(dataset)

val transformedDatasetPar4 = ovaModelPar4.transform(dataset)

val ovaResultsPar4 = transformedDatasetPar4.select("prediction", "label").rdd.map {
row => (row.getDouble(0), row.getDouble(1))
}

val metricsPar2 = new MulticlassMetrics(ovaResultsPar2)
val metricsPar4 = new MulticlassMetrics(ovaResultsPar4)
assert(metricsPar2.confusionMatrix ~== metricsPar4.confusionMatrix absTol 400)
}

test("one-vs-rest: pass label metadata correctly during train") {
Expand Down
31 changes: 17 additions & 14 deletions python/pyspark/ml/classification.py
Original file line number Diff line number Diff line change
Expand Up @@ -1511,8 +1511,9 @@ class OneVsRest(Estimator, OneVsRestParams, MLReadable, MLWritable):

.. versionadded:: 2.0.0
"""

parallelism = Param(Params._dummy(), "parallelism",
"Number of models to fit in parallel",
"number of processors to use when fitting models in parallel",
typeConverter=TypeConverters.toInt)

@keyword_only
Expand All @@ -1538,6 +1539,20 @@ def setParams(self, featuresCol=None, labelCol=None, predictionCol=None,
kwargs = self._input_kwargs
return self._set(**kwargs)

@since("2.3.0")
def setParallelism(self, value):
"""
Sets the value of :py:attr:`parallelism`.
"""
return self._set(parallelism=value)

@since("2.3.0")
def getParallelism(self):
"""
Gets the value of parallelism or its default value.
"""
return self.getOrDefault(self.parallelism)

def _fit(self, dataset):
labelCol = self.getLabelCol()
featuresCol = self.getFeaturesCol()
Expand Down Expand Up @@ -1566,7 +1581,7 @@ def trainSingleClass(index):
(classifier.predictionCol, predictionCol)])
return classifier.fit(trainingDataset, paramMap)

pool = ThreadPool(processes=self.getParallelism())
pool = ThreadPool(processes=min(self.getParallelism(), numClasses))

models = pool.map(trainSingleClass, range(numClasses))

Expand All @@ -1575,18 +1590,6 @@ def trainSingleClass(index):

return self._copyValues(OneVsRestModel(models=models))

def setParallelism(self, value):
"""
Sets the value of :py:attr:`parallelism`.
"""
return self._set(parallelism=value)

def getParallelism(self):
"""
Gets the value of parallelism or its default value.
"""
return self.getOrDefault(self.parallelism)

@since("2.0.0")
def copy(self, extra=None):
"""
Expand Down
29 changes: 6 additions & 23 deletions python/pyspark/ml/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -1234,33 +1234,16 @@ def test_output_columns(self):
output = model.transform(df)
self.assertEqual(output.columns, ["label", "features", "prediction"])


class ParOneVsRestTests(SparkSessionTestCase):

def test_copy(self):
df = self.spark.createDataFrame([(0.0, Vectors.dense(1.0, 0.8)),
(1.0, Vectors.sparse(2, [], [])),
(2.0, Vectors.dense(0.5, 0.5))],
["label", "features"])
lr = LogisticRegression(maxIter=5, regParam=0.01)
ovr = OneVsRest(classifier=lr, parallelism=8)
ovr1 = ovr.copy({lr.maxIter: 10})
self.assertEqual(ovr.getClassifier().getMaxIter(), 5)
self.assertEqual(ovr1.getClassifier().getMaxIter(), 10)
model = ovr.fit(df)
model1 = model.copy({model.predictionCol: "indexed"})
self.assertEqual(model1.getPredictionCol(), "indexed")

def test_output_columns(self):
def test_parallelism_doesnt_change_output(self):
df = self.spark.createDataFrame([(0.0, Vectors.dense(1.0, 0.8)),
(1.0, Vectors.sparse(2, [], [])),
(2.0, Vectors.dense(0.5, 0.5))],
["label", "features"])
lr = LogisticRegression(maxIter=5, regParam=0.01)
ovr = OneVsRest(classifier=lr, parallelism=8)
model = ovr.fit(df)
output = model.transform(df)
self.assertEqual(output.columns, ["label", "features", "prediction"])
ovrPar2 = OneVsRest(classifier=LogisticRegression(maxIter=5, regParam=.01), parallelism=2)
modelPar2 = ovrPar2.fit(df)
ovrPar4 = OneVsRest(classifier=LogisticRegression(maxIter=5, regParam=.01), parallelism=4)
modelPar4 = ovrPar4.fit(df)
self.assertEqual(modelPar2.getPredictionCol(), modelPar4.getPredictionCol())


class HashingTFTest(SparkSessionTestCase):
Expand Down