diff --git a/mllib/src/main/scala/org/apache/spark/ml/classification/OneVsRest.scala b/mllib/src/main/scala/org/apache/spark/ml/classification/OneVsRest.scala index 05b8c3ab5456..ad08d572a0b2 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/classification/OneVsRest.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/classification/OneVsRest.scala @@ -17,10 +17,10 @@ package org.apache.spark.ml.classification -import java.util.{List => JList} import java.util.UUID -import scala.collection.JavaConverters._ +import scala.concurrent.Future +import scala.concurrent.duration.Duration import scala.language.existentials import org.apache.hadoop.fs.Path @@ -34,12 +34,13 @@ import org.apache.spark.ml._ import org.apache.spark.ml.attribute._ import org.apache.spark.ml.linalg.Vector import org.apache.spark.ml.param.{Param, ParamMap, ParamPair, Params} -import org.apache.spark.ml.param.shared.HasWeightCol +import org.apache.spark.ml.param.shared.{HasParallelism, HasWeightCol} import org.apache.spark.ml.util._ import org.apache.spark.sql.{DataFrame, Dataset, Row} import org.apache.spark.sql.functions._ import org.apache.spark.sql.types._ import org.apache.spark.storage.StorageLevel +import org.apache.spark.util.ThreadUtils private[ml] trait ClassifierTypeTrait { // scalastyle:off structural.type @@ -273,7 +274,7 @@ object OneVsRestModel extends MLReadable[OneVsRestModel] { @Since("1.4.0") final class OneVsRest @Since("1.4.0") ( @Since("1.4.0") override val uid: String) - extends Estimator[OneVsRestModel] with OneVsRestParams with MLWritable { + extends Estimator[OneVsRestModel] with OneVsRestParams with HasParallelism with MLWritable { @Since("1.4.0") def this() = this(Identifiable.randomUID("oneVsRest")) @@ -296,6 +297,18 @@ final class OneVsRest @Since("1.4.0") ( @Since("1.5.0") def setPredictionCol(value: String): this.type = set(predictionCol, value) + /** @group expertGetParam */ + override def getParallelism: Int = $(parallelism) + + /** + * @group expertSetParam + * The implementation of parallel one vs. rest runs the classification for + * each class in a separate threads. + */ + override def setParallelism(value: Int): this.type = { + set(parallelism, value) + } + /** * Sets the value of param [[weightCol]]. * @@ -318,7 +331,7 @@ final class OneVsRest @Since("1.4.0") ( transformSchema(dataset.schema) val instr = Instrumentation.create(this, dataset) - instr.logParams(labelCol, featuresCol, predictionCol) + instr.logParams(labelCol, featuresCol, predictionCol, parallelism) instr.logNamedValue("classifier", $(classifier).getClass.getCanonicalName) // determine number of classes either from metadata if provided, or via computation. @@ -352,8 +365,10 @@ final class OneVsRest @Since("1.4.0") ( multiclassLabeled.persist(StorageLevel.MEMORY_AND_DISK) } + val executionContext = getExecutionContext + // create k columns, one for each binary classifier. - val models = Range(0, numClasses).par.map { index => + val modelFutures = Range(0, numClasses).map { index => // generate new label metadata for the binary problem. val newLabelMeta = BinaryAttribute.defaultAttr.withName("label").toMetadata() val labelColName = "mc2b$" + index @@ -364,14 +379,18 @@ final class OneVsRest @Since("1.4.0") ( paramMap.put(classifier.labelCol -> labelColName) paramMap.put(classifier.featuresCol -> getFeaturesCol) paramMap.put(classifier.predictionCol -> getPredictionCol) - if (weightColIsUsed) { - val classifier_ = classifier.asInstanceOf[ClassifierType with HasWeightCol] - paramMap.put(classifier_.weightCol -> getWeightCol) - classifier_.fit(trainingDataset, paramMap) - } else { - classifier.fit(trainingDataset, paramMap) - } - }.toArray[ClassificationModel[_, _]] + Future { + if (weightColIsUsed) { + val classifier_ = classifier.asInstanceOf[ClassifierType with HasWeightCol] + paramMap.put(classifier_.weightCol -> getWeightCol) + classifier_.fit(trainingDataset, paramMap) + } else { + classifier.fit(trainingDataset, paramMap) + } + }(executionContext) + } + val models = modelFutures + .map(ThreadUtils.awaitResult(_, Duration.Inf)).toArray[ClassificationModel[_, _]] instr.logNumFeatures(models.head.numFeatures) if (handlePersistence) { diff --git a/mllib/src/main/scala/org/apache/spark/ml/param/shared/HasParallelism.scala b/mllib/src/main/scala/org/apache/spark/ml/param/shared/HasParallelism.scala new file mode 100644 index 000000000000..cf5a5a89c822 --- /dev/null +++ b/mllib/src/main/scala/org/apache/spark/ml/param/shared/HasParallelism.scala @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.ml.param.shared + +import scala.concurrent.ExecutionContext + +import org.apache.spark.ml.param.{IntParam, Params, ParamValidators} +import org.apache.spark.util.ThreadUtils + +/** + * Common parameter for estimators trained in a multithreaded environment. + */ +private[ml] trait HasParallelism extends Params { + + /** + * param for the number of threads to use when running parallel meta-algorithms + * @group expertParam + */ + val parallelism = new IntParam(this, "parallelism", + "the number of threads to use when running parallel algorithms", ParamValidators.gtEq(1)) + + setDefault(parallelism -> 1) + + /** @group expertGetParam */ + def getParallelism: Int = $(parallelism) + + /** @group expertSetParam */ + def setParallelism(value: Int): this.type = { + set(parallelism, value) + } + + private[ml] def getExecutionContext: ExecutionContext = { + getParallelism match { + case 1 => + ThreadUtils.sameThread + case n => + ExecutionContext.fromExecutorService(ThreadUtils + .newDaemonCachedThreadPool(s"${this.getClass.getSimpleName}-thread-pool", n)) + } + } +} diff --git a/mllib/src/test/scala/org/apache/spark/ml/classification/OneVsRestSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/classification/OneVsRestSuite.scala index 17f82827b74e..ff8cdc02fbac 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/classification/OneVsRestSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/classification/OneVsRestSuite.scala @@ -101,6 +101,44 @@ class OneVsRestSuite extends SparkFunSuite with MLlibTestSparkContext with Defau assert(expectedMetrics.confusionMatrix ~== ovaMetrics.confusionMatrix absTol 400) } + test("one-vs-rest: tuning parallelism does not change output") { + val ovaPar1 = new OneVsRest() + .setClassifier(new LogisticRegression) + + val ovaModelPar1 = ovaPar1.fit(dataset) + + val transformedDatasetPar1 = ovaModelPar1.transform(dataset) + + val ovaResultsPar1 = transformedDatasetPar1.select("prediction", "label").rdd.map { + row => (row.getDouble(0), row.getDouble(1)) + } + + val ovaPar2 = new OneVsRest() + .setClassifier(new LogisticRegression) + .setParallelism(2) + + val ovaModelPar2 = ovaPar2.fit(dataset) + + val transformedDatasetPar2 = ovaModelPar2.transform(dataset) + + val ovaResultsPar2 = transformedDatasetPar2.select("prediction", "label").rdd.map { + row => (row.getDouble(0), row.getDouble(1)) + } + + val metricsPar1 = new MulticlassMetrics(ovaResultsPar1) + val metricsPar2 = new MulticlassMetrics(ovaResultsPar2) + assert(metricsPar1.confusionMatrix == metricsPar2.confusionMatrix) + + ovaModelPar1.models.zip(ovaModelPar2.models).foreach { + case (lrModel1: LogisticRegressionModel, lrModel2: LogisticRegressionModel) => + assert(lrModel1.coefficients === lrModel2.coefficients) + assert(lrModel1.intercept === lrModel2.intercept) + case other => + throw new AssertionError(s"Loaded OneVsRestModel expected model of type" + + s" LogisticRegressionModel but found ${other.getClass.getName}") + } + } + test("one-vs-rest: pass label metadata correctly during train") { val numClasses = 3 val ova = new OneVsRest() diff --git a/python/pyspark/ml/classification.py b/python/pyspark/ml/classification.py index bccf8e7f636f..43c722e2124d 100644 --- a/python/pyspark/ml/classification.py +++ b/python/pyspark/ml/classification.py @@ -16,6 +16,7 @@ # import operator +from multiprocessing.pool import ThreadPool from pyspark import since, keyword_only from pyspark.ml import Estimator, Model @@ -1562,7 +1563,7 @@ def getClassifier(self): @inherit_doc -class OneVsRest(Estimator, OneVsRestParams, JavaMLReadable, JavaMLWritable): +class OneVsRest(Estimator, OneVsRestParams, HasParallelism, JavaMLReadable, JavaMLWritable): """ .. note:: Experimental @@ -1607,22 +1608,23 @@ class OneVsRest(Estimator, OneVsRestParams, JavaMLReadable, JavaMLWritable): @keyword_only def __init__(self, featuresCol="features", labelCol="label", predictionCol="prediction", - classifier=None, weightCol=None): + classifier=None, weightCol=None, parallelism=1): """ __init__(self, featuresCol="features", labelCol="label", predictionCol="prediction", \ - classifier=None, weightCol=None) + classifier=None, weightCol=None, parallelism=1): """ super(OneVsRest, self).__init__() + self._setDefault(parallelism=1) kwargs = self._input_kwargs self._set(**kwargs) @keyword_only @since("2.0.0") - def setParams(self, featuresCol=None, labelCol=None, predictionCol=None, - classifier=None, weightCol=None): + def setParams(self, featuresCol="features", labelCol="label", predictionCol="prediction", + classifier=None, weightCol=None, parallelism=1): """ setParams(self, featuresCol=None, labelCol=None, predictionCol=None, \ - classifier=None, weightCol=None): + classifier=None, weightCol=None, parallelism=1): Sets params for OneVsRest. """ kwargs = self._input_kwargs @@ -1669,8 +1671,9 @@ def trainSingleClass(index): paramMap[classifier.weightCol] = weightCol return classifier.fit(trainingDataset, paramMap) - # TODO: Parallel training for all classes. - models = [trainSingleClass(i) for i in range(numClasses)] + pool = ThreadPool(processes=min(self.getParallelism(), numClasses)) + + models = pool.map(trainSingleClass, range(numClasses)) if handlePersistence: multiclassLabeled.unpersist() @@ -1704,8 +1707,9 @@ def _from_java(cls, java_stage): labelCol = java_stage.getLabelCol() predictionCol = java_stage.getPredictionCol() classifier = JavaParams._from_java(java_stage.getClassifier()) + parallelism = java_stage.getParallelism() py_stage = cls(featuresCol=featuresCol, labelCol=labelCol, predictionCol=predictionCol, - classifier=classifier) + classifier=classifier, parallelism=parallelism) py_stage._resetUid(java_stage.uid()) return py_stage @@ -1718,6 +1722,7 @@ def _to_java(self): _java_obj = JavaParams._new_java_obj("org.apache.spark.ml.classification.OneVsRest", self.uid) _java_obj.setClassifier(self.getClassifier()._to_java()) + _java_obj.setParallelism(self.getParallelism()) _java_obj.setFeaturesCol(self.getFeaturesCol()) _java_obj.setLabelCol(self.getLabelCol()) _java_obj.setPredictionCol(self.getPredictionCol()) diff --git a/python/pyspark/ml/param/_shared_params_code_gen.py b/python/pyspark/ml/param/_shared_params_code_gen.py index 51d49b524c32..34a1773d34bf 100644 --- a/python/pyspark/ml/param/_shared_params_code_gen.py +++ b/python/pyspark/ml/param/_shared_params_code_gen.py @@ -152,6 +152,8 @@ def get$Name(self): ("varianceCol", "column name for the biased sample variance of prediction.", None, "TypeConverters.toString"), ("aggregationDepth", "suggested depth for treeAggregate (>= 2).", "2", + "TypeConverters.toInt"), + ("parallelism", "number of threads to use when fitting models in parallel.", "1", "TypeConverters.toInt")] code = [] diff --git a/python/pyspark/ml/param/shared.py b/python/pyspark/ml/param/shared.py index 163a0e2b3a96..5c84b32255de 100644 --- a/python/pyspark/ml/param/shared.py +++ b/python/pyspark/ml/param/shared.py @@ -608,6 +608,30 @@ def getAggregationDepth(self): return self.getOrDefault(self.aggregationDepth) +class HasParallelism(Params): + """ + Mixin for param parallelism: number of threads to use when fitting models in parallel. + """ + + parallelism = Param(Params._dummy(), "parallelism", "number of threads to use when fitting models in parallel.", typeConverter=TypeConverters.toInt) + + def __init__(self): + super(HasParallelism, self).__init__() + self._setDefault(parallelism=1) + + def setParallelism(self, value): + """ + Sets the value of :py:attr:`parallelism`. + """ + return self._set(parallelism=value) + + def getParallelism(self): + """ + Gets the value of parallelism or its default value. + """ + return self.getOrDefault(self.parallelism) + + class DecisionTreeParams(Params): """ Mixin for Decision Tree parameters. diff --git a/python/pyspark/ml/tests.py b/python/pyspark/ml/tests.py index 6aecc7fe8707..d36b949f979b 100755 --- a/python/pyspark/ml/tests.py +++ b/python/pyspark/ml/tests.py @@ -1454,11 +1454,25 @@ def test_output_columns(self): (2.0, Vectors.dense(0.5, 0.5))], ["label", "features"]) lr = LogisticRegression(maxIter=5, regParam=0.01) - ovr = OneVsRest(classifier=lr) + ovr = OneVsRest(classifier=lr, parallelism=1) model = ovr.fit(df) output = model.transform(df) self.assertEqual(output.columns, ["label", "features", "prediction"]) + def test_parallelism_doesnt_change_output(self): + df = self.spark.createDataFrame([(0.0, Vectors.dense(1.0, 0.8)), + (1.0, Vectors.sparse(2, [], [])), + (2.0, Vectors.dense(0.5, 0.5))], + ["label", "features"]) + ovrPar1 = OneVsRest(classifier=LogisticRegression(maxIter=5, regParam=.01), parallelism=1) + modelPar1 = ovrPar1.fit(df) + ovrPar2 = OneVsRest(classifier=LogisticRegression(maxIter=5, regParam=.01), parallelism=2) + modelPar2 = ovrPar2.fit(df) + for i, model in enumerate(modelPar1.models): + self.assertTrue(np.allclose(model.coefficients.toArray(), + modelPar2.models[i].coefficients.toArray(), atol=1E-4)) + self.assertTrue(np.allclose(model.intercept, modelPar2.models[i].intercept, atol=1E-4)) + def test_support_for_weightCol(self): df = self.spark.createDataFrame([(0.0, Vectors.dense(1.0, 0.8), 1.0), (1.0, Vectors.sparse(2, [], []), 1.0),