Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,10 @@

package org.apache.spark.ml.classification

import java.util.{List => JList}
import java.util.UUID

import scala.collection.JavaConverters._
import scala.concurrent.Future
import scala.concurrent.duration.Duration
import scala.language.existentials

import org.apache.hadoop.fs.Path
Expand All @@ -34,12 +34,13 @@ import org.apache.spark.ml._
import org.apache.spark.ml.attribute._
import org.apache.spark.ml.linalg.Vector
import org.apache.spark.ml.param.{Param, ParamMap, ParamPair, Params}
import org.apache.spark.ml.param.shared.HasWeightCol
import org.apache.spark.ml.param.shared.{HasParallelism, HasWeightCol}
import org.apache.spark.ml.util._
import org.apache.spark.sql.{DataFrame, Dataset, Row}
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
import org.apache.spark.storage.StorageLevel
import org.apache.spark.util.ThreadUtils

private[ml] trait ClassifierTypeTrait {
// scalastyle:off structural.type
Expand Down Expand Up @@ -273,7 +274,7 @@ object OneVsRestModel extends MLReadable[OneVsRestModel] {
@Since("1.4.0")
final class OneVsRest @Since("1.4.0") (
@Since("1.4.0") override val uid: String)
extends Estimator[OneVsRestModel] with OneVsRestParams with MLWritable {
extends Estimator[OneVsRestModel] with OneVsRestParams with HasParallelism with MLWritable {

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we move the HasParallelism into OneVsRestParams ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It will be used by cross validator and train validation split, that's why it's shared.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we can define
trait OneVsRestParams extends PredictorParams with HasParallelism
trait TrainValidationSplitParams extends ValidatorParams with HasParallelism
and so on, like other ml algos do ?
But I am not sure whether there is other reasons.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@WeichenXu123 earlier in the discussion for this PR we had been coordinating with similar changes for parallel cross validator and train validation split at #16774 .

If trait OneVsRestParams extends PredictorParams with HasParallelism was done instead, then it would just give the parallelism param to the OneVsRestModel which isn't really useful since it's already been trained.

@Since("1.4.0")
def this() = this(Identifiable.randomUID("oneVsRest"))
Expand All @@ -296,6 +297,18 @@ final class OneVsRest @Since("1.4.0") (
@Since("1.5.0")
def setPredictionCol(value: String): this.type = set(predictionCol, value)

/** @group expertGetParam */
override def getParallelism: Int = $(parallelism)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This one can just go in the trait right?


/**
* @group expertSetParam
* The implementation of parallel one vs. rest runs the classification for
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This doc could be a bit better - something like "Set the number of threads used to fit the model for each class in parallel. Default is 1 (fit all models serially)." or similar

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, please put the group annotation at the bottom to match existing code style.

* each class in a separate threads.
*/
override def setParallelism(value: Int): this.type = {
set(parallelism, value)
}

/**
* Sets the value of param [[weightCol]].
*
Expand All @@ -318,7 +331,7 @@ final class OneVsRest @Since("1.4.0") (
transformSchema(dataset.schema)

val instr = Instrumentation.create(this, dataset)
instr.logParams(labelCol, featuresCol, predictionCol)
instr.logParams(labelCol, featuresCol, predictionCol, parallelism)
instr.logNamedValue("classifier", $(classifier).getClass.getCanonicalName)

// determine number of classes either from metadata if provided, or via computation.
Expand Down Expand Up @@ -352,8 +365,10 @@ final class OneVsRest @Since("1.4.0") (
multiclassLabeled.persist(StorageLevel.MEMORY_AND_DISK)
}

val executionContext = getExecutionContext

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

might be good to log a message here indicating the parallelism used

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can use instr.log?

// create k columns, one for each binary classifier.
val models = Range(0, numClasses).par.map { index =>
val modelFutures = Range(0, numClasses).map { index =>
// generate new label metadata for the binary problem.
val newLabelMeta = BinaryAttribute.defaultAttr.withName("label").toMetadata()
val labelColName = "mc2b$" + index
Expand All @@ -364,14 +379,18 @@ final class OneVsRest @Since("1.4.0") (
paramMap.put(classifier.labelCol -> labelColName)
paramMap.put(classifier.featuresCol -> getFeaturesCol)
paramMap.put(classifier.predictionCol -> getPredictionCol)
if (weightColIsUsed) {
val classifier_ = classifier.asInstanceOf[ClassifierType with HasWeightCol]
paramMap.put(classifier_.weightCol -> getWeightCol)
classifier_.fit(trainingDataset, paramMap)
} else {
classifier.fit(trainingDataset, paramMap)
}
}.toArray[ClassificationModel[_, _]]
Future {
if (weightColIsUsed) {
val classifier_ = classifier.asInstanceOf[ClassifierType with HasWeightCol]
paramMap.put(classifier_.weightCol -> getWeightCol)
classifier_.fit(trainingDataset, paramMap)
} else {
classifier.fit(trainingDataset, paramMap)
}
}(executionContext)
}
val models = modelFutures
.map(ThreadUtils.awaitResult(_, Duration.Inf)).toArray[ClassificationModel[_, _]]
instr.logNumFeatures(models.head.numFeatures)

if (handlePersistence) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.ml.param.shared

import scala.concurrent.ExecutionContext

import org.apache.spark.ml.param.{IntParam, Params, ParamValidators}
import org.apache.spark.util.ThreadUtils

/**
* Common parameter for estimators trained in a multithreaded environment.
*/
private[ml] trait HasParallelism extends Params {

/**
* param for the number of threads to use when running parallel meta-algorithms
* @group expertParam
*/
val parallelism = new IntParam(this, "parallelism",
"the number of threads to use when running parallel algorithms", ParamValidators.gtEq(1))

setDefault(parallelism -> 1)

/** @group expertGetParam */
def getParallelism: Int = $(parallelism)

/** @group expertSetParam */
def setParallelism(value: Int): this.type = {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I forgot before: This should go in OneVsRest itself to be Java-friendly.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can remove this now that it is in OneVsRest

set(parallelism, value)
}

private[ml] def getExecutionContext: ExecutionContext = {
getParallelism match {
case 1 =>
ThreadUtils.sameThread
case n =>
ExecutionContext.fromExecutorService(ThreadUtils
.newDaemonCachedThreadPool(s"${this.getClass.getSimpleName}-thread-pool", n))
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,44 @@ class OneVsRestSuite extends SparkFunSuite with MLlibTestSparkContext with Defau
assert(expectedMetrics.confusionMatrix ~== ovaMetrics.confusionMatrix absTol 400)
}

test("one-vs-rest: tuning parallelism does not change output") {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So I know this would be annoying to do - but would it make sense to test that we're actually training the models in parallel? I think we're probably doing it correctly right now, but I could see this accidentally getting screwed up and using the wrong execution context in the future.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a good way to do that? I'm having trouble thinking of ways to do it which would not produce flaky tests.

val ovaPar1 = new OneVsRest()
.setClassifier(new LogisticRegression)

val ovaModelPar1 = ovaPar1.fit(dataset)

val transformedDatasetPar1 = ovaModelPar1.transform(dataset)

val ovaResultsPar1 = transformedDatasetPar1.select("prediction", "label").rdd.map {
row => (row.getDouble(0), row.getDouble(1))
}

val ovaPar2 = new OneVsRest()
.setClassifier(new LogisticRegression)
.setParallelism(2)

val ovaModelPar2 = ovaPar2.fit(dataset)

val transformedDatasetPar2 = ovaModelPar2.transform(dataset)

val ovaResultsPar2 = transformedDatasetPar2.select("prediction", "label").rdd.map {
row => (row.getDouble(0), row.getDouble(1))
}

val metricsPar1 = new MulticlassMetrics(ovaResultsPar1)
val metricsPar2 = new MulticlassMetrics(ovaResultsPar2)
assert(metricsPar1.confusionMatrix == metricsPar2.confusionMatrix)

ovaModelPar1.models.zip(ovaModelPar2.models).foreach {
case (lrModel1: LogisticRegressionModel, lrModel2: LogisticRegressionModel) =>
assert(lrModel1.coefficients === lrModel2.coefficients)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps we should use the approx equal version for vectors and matrices here and above? It seems the test does pass, but perhaps that would be better, to avoid future flakiness for whatever reason. Also, we do so in the Python tests so it would be more consistent.

assert(lrModel1.intercept === lrModel2.intercept)
case other =>
throw new AssertionError(s"Loaded OneVsRestModel expected model of type" +
s" LogisticRegressionModel but found ${other.getClass.getName}")
}
}

test("one-vs-rest: pass label metadata correctly during train") {
val numClasses = 3
val ova = new OneVsRest()
Expand Down
23 changes: 14 additions & 9 deletions python/pyspark/ml/classification.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
#

import operator
from multiprocessing.pool import ThreadPool

from pyspark import since, keyword_only
from pyspark.ml import Estimator, Model
Expand Down Expand Up @@ -1562,7 +1563,7 @@ def getClassifier(self):


@inherit_doc
class OneVsRest(Estimator, OneVsRestParams, JavaMLReadable, JavaMLWritable):
class OneVsRest(Estimator, OneVsRestParams, HasParallelism, JavaMLReadable, JavaMLWritable):
"""
.. note:: Experimental

Expand Down Expand Up @@ -1607,22 +1608,23 @@ class OneVsRest(Estimator, OneVsRestParams, JavaMLReadable, JavaMLWritable):

@keyword_only
def __init__(self, featuresCol="features", labelCol="label", predictionCol="prediction",
classifier=None, weightCol=None):
classifier=None, weightCol=None, parallelism=1):
"""
__init__(self, featuresCol="features", labelCol="label", predictionCol="prediction", \
classifier=None, weightCol=None)
classifier=None, weightCol=None, parallelism=1):
"""
super(OneVsRest, self).__init__()
self._setDefault(parallelism=1)
kwargs = self._input_kwargs
self._set(**kwargs)

@keyword_only
@since("2.0.0")
def setParams(self, featuresCol=None, labelCol=None, predictionCol=None,
classifier=None, weightCol=None):
def setParams(self, featuresCol="features", labelCol="label", predictionCol="prediction",
classifier=None, weightCol=None, parallelism=1):
"""
setParams(self, featuresCol=None, labelCol=None, predictionCol=None, \
classifier=None, weightCol=None):
classifier=None, weightCol=None, parallelism=1):
Sets params for OneVsRest.
"""
kwargs = self._input_kwargs
Expand Down Expand Up @@ -1669,8 +1671,9 @@ def trainSingleClass(index):
paramMap[classifier.weightCol] = weightCol
return classifier.fit(trainingDataset, paramMap)

# TODO: Parallel training for all classes.
models = [trainSingleClass(i) for i in range(numClasses)]
pool = ThreadPool(processes=min(self.getParallelism(), numClasses))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We limit the pool here, but not in Scala. Is there a real benefit to limiting it?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In Scala, it doesn't matter because this just sets the max size of the pool and creates threads as needed, so it will never go above numClasses anyway. In Python its a little unclear what it's doing, the ThreadPool class is not well documented, so I'm not sure if it makes a difference here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So in Scala the threadpool is cached, here we aren't doing that and I think its a bit more heavy weight in Python so we might want to consider if there is a reasonable way to reuse (if not that's probably OK to since this overhead pales in comparison to training serially).


models = pool.map(trainSingleClass, range(numClasses))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So if the training of the models takes different times and there is a relatively large number of models we might end up blocking a worker here since the map effectively splits in advance. What do you think of providing a chunksize hint or using imap or imap_unordered instead? Since the overhead of sending the element to the worker should be relatively small compared to the time it takes to train the model?


if handlePersistence:
multiclassLabeled.unpersist()
Expand Down Expand Up @@ -1704,8 +1707,9 @@ def _from_java(cls, java_stage):
labelCol = java_stage.getLabelCol()
predictionCol = java_stage.getPredictionCol()
classifier = JavaParams._from_java(java_stage.getClassifier())
parallelism = java_stage.getParallelism()
py_stage = cls(featuresCol=featuresCol, labelCol=labelCol, predictionCol=predictionCol,
classifier=classifier)
classifier=classifier, parallelism=parallelism)
py_stage._resetUid(java_stage.uid())
return py_stage

Expand All @@ -1718,6 +1722,7 @@ def _to_java(self):
_java_obj = JavaParams._new_java_obj("org.apache.spark.ml.classification.OneVsRest",
self.uid)
_java_obj.setClassifier(self.getClassifier()._to_java())
_java_obj.setParallelism(self.getParallelism())
_java_obj.setFeaturesCol(self.getFeaturesCol())
_java_obj.setLabelCol(self.getLabelCol())
_java_obj.setPredictionCol(self.getPredictionCol())
Expand Down
2 changes: 2 additions & 0 deletions python/pyspark/ml/param/_shared_params_code_gen.py
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,8 @@ def get$Name(self):
("varianceCol", "column name for the biased sample variance of prediction.",
None, "TypeConverters.toString"),
("aggregationDepth", "suggested depth for treeAggregate (>= 2).", "2",
"TypeConverters.toInt"),
("parallelism", "number of threads to use when fitting models in parallel.", "1",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should probably indicate that the value range is "(>= 1)" at the end of the description.

"TypeConverters.toInt")]

code = []
Expand Down
24 changes: 24 additions & 0 deletions python/pyspark/ml/param/shared.py
Original file line number Diff line number Diff line change
Expand Up @@ -608,6 +608,30 @@ def getAggregationDepth(self):
return self.getOrDefault(self.aggregationDepth)


class HasParallelism(Params):
"""
Mixin for param parallelism: number of threads to use when fitting models in parallel.
"""

parallelism = Param(Params._dummy(), "parallelism", "number of threads to use when fitting models in parallel.", typeConverter=TypeConverters.toInt)

def __init__(self):
super(HasParallelism, self).__init__()
self._setDefault(parallelism=1)

def setParallelism(self, value):
"""
Sets the value of :py:attr:`parallelism`.
"""
return self._set(parallelism=value)

def getParallelism(self):
"""
Gets the value of parallelism or its default value.
"""
return self.getOrDefault(self.parallelism)


class DecisionTreeParams(Params):
"""
Mixin for Decision Tree parameters.
Expand Down
16 changes: 15 additions & 1 deletion python/pyspark/ml/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -1454,11 +1454,25 @@ def test_output_columns(self):
(2.0, Vectors.dense(0.5, 0.5))],
["label", "features"])
lr = LogisticRegression(maxIter=5, regParam=0.01)
ovr = OneVsRest(classifier=lr)
ovr = OneVsRest(classifier=lr, parallelism=1)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same as above

model = ovr.fit(df)
output = model.transform(df)
self.assertEqual(output.columns, ["label", "features", "prediction"])

def test_parallelism_doesnt_change_output(self):
df = self.spark.createDataFrame([(0.0, Vectors.dense(1.0, 0.8)),
(1.0, Vectors.sparse(2, [], [])),
(2.0, Vectors.dense(0.5, 0.5))],
["label", "features"])
ovrPar1 = OneVsRest(classifier=LogisticRegression(maxIter=5, regParam=.01), parallelism=1)
modelPar1 = ovrPar1.fit(df)
ovrPar2 = OneVsRest(classifier=LogisticRegression(maxIter=5, regParam=.01), parallelism=2)
modelPar2 = ovrPar2.fit(df)
for i, model in enumerate(modelPar1.models):
self.assertTrue(np.allclose(model.coefficients.toArray(),
modelPar2.models[i].coefficients.toArray(), atol=1E-4))
self.assertTrue(np.allclose(model.intercept, modelPar2.models[i].intercept, atol=1E-4))

def test_support_for_weightCol(self):
df = self.spark.createDataFrame([(0.0, Vectors.dense(1.0, 0.8), 1.0),
(1.0, Vectors.sparse(2, [], []), 1.0),
Expand Down