Skip to content
Closed
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,13 @@ private[shared] object SharedParamsCodeGen {
"all instance weights as 1.0"),
ParamDesc[String]("solver", "the solver algorithm for optimization", finalFields = false),
ParamDesc[Int]("aggregationDepth", "suggested depth for treeAggregate (>= 2)", Some("2"),
isValid = "ParamValidators.gtEq(2)", isExpertParam = true))
isValid = "ParamValidators.gtEq(2)", isExpertParam = true),
ParamDesc[Boolean]("collectSubModels", "If set to false, then only the single best " +
"sub-model will be available after fitting. If set to true, then all sub-models will be " +
"available. Warning: For large models, collecting all sub-models can cause OOMs on the " +
"Spark driver.",
Some("false"), isExpertParam = true)
)

val code = genSharedParams(params)
val file = "src/main/scala/org/apache/spark/ml/param/shared/sharedParams.scala"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -402,4 +402,21 @@ private[ml] trait HasAggregationDepth extends Params {
/** @group expertGetParam */
final def getAggregationDepth: Int = $(aggregationDepth)
}

/**
* Trait for shared param collectSubModels (default: false).
*/
private[ml] trait HasCollectSubModels extends Params {

/**
* Param for whether to collect a list of sub-models trained during tuning.
* @group expertParam
*/
final val collectSubModels: BooleanParam = new BooleanParam(this, "collectSubModels", "whether to collect a list of sub-models trained during tuning")

setDefault(collectSubModels, false)

/** @group expertGetParam */
final def getCollectSubModels: Boolean = $(collectSubModels)
}
// scalastyle:on
110 changes: 101 additions & 9 deletions mllib/src/main/scala/org/apache/spark/ml/tuning/CrossValidator.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.spark.ml.tuning

import java.util.{List => JList}
import java.util.{List => JList, Locale}

import scala.collection.JavaConverters._
import scala.concurrent.Future
Expand All @@ -31,7 +31,7 @@ import org.apache.spark.internal.Logging
import org.apache.spark.ml.{Estimator, Model}
import org.apache.spark.ml.evaluation.Evaluator
import org.apache.spark.ml.param.{IntParam, ParamMap, ParamValidators}
import org.apache.spark.ml.param.shared.HasParallelism
import org.apache.spark.ml.param.shared.{HasCollectSubModels, HasParallelism}
import org.apache.spark.ml.util._
import org.apache.spark.mllib.util.MLUtils
import org.apache.spark.sql.{DataFrame, Dataset}
Expand Down Expand Up @@ -67,7 +67,8 @@ private[ml] trait CrossValidatorParams extends ValidatorParams {
@Since("1.2.0")
class CrossValidator @Since("1.2.0") (@Since("1.4.0") override val uid: String)
extends Estimator[CrossValidatorModel]
with CrossValidatorParams with HasParallelism with MLWritable with Logging {
with CrossValidatorParams with HasParallelism with HasCollectSubModels
with MLWritable with Logging {

@Since("1.2.0")
def this() = this(Identifiable.randomUID("cv"))
Expand Down Expand Up @@ -101,6 +102,10 @@ class CrossValidator @Since("1.2.0") (@Since("1.4.0") override val uid: String)
@Since("2.3.0")
def setParallelism(value: Int): this.type = set(parallelism, value)

/** @group expertSetParam */
@Since("2.3.0")
def setCollectSubModels(value: Boolean): this.type = set(collectSubModels, value)

@Since("2.0.0")
override def fit(dataset: Dataset[_]): CrossValidatorModel = {
val schema = dataset.schema
Expand All @@ -117,6 +122,12 @@ class CrossValidator @Since("1.2.0") (@Since("1.4.0") override val uid: String)
instr.logParams(numFolds, seed, parallelism)
logTuningParams(instr)

val collectSubModelsParam = $(collectSubModels)

var subModels: Option[Array[Array[Model[_]]]] = if (collectSubModelsParam) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so this var seems unnecessary, could we just it seems like we'd be better by just collecting modelFutures in copy values (then we can avoid the mutation on L145)

Copy link
Contributor Author

@WeichenXu123 WeichenXu123 Nov 3, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@holdenk @jkbradley I already thought about this issue. The reason I use this way is:

  1. The modelFutures and foldMetricFutures will be executed in pipelined way, when $(collectSubModels) == false, this will make sure that the model generated in modelFutures will be released in time, so that the maximum memory cost will be numParallelism * sizeof(model). If we use the way of "collecting modelFutures", it will increase the memory cost to be $(estimatorParamMaps).length * sizeof(model) . This is a serious issue which is discussed before.
  2. IMO the mutation on L145 won't influence performance. and it do not need something like lock, there is no race condition.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't follow with #1, if we keep all the models (e.g. set collectSubModelsParam) then the maximum memory cost will be $(estimatorParamMaps).length * sizeof(model) in either case? If we don't keep the models (e.g. set collectSubModelsParam to false) then you don't have to collect the future back at the end and there is no additional overhead.

For #2, It's not that mutation impacts performance, its that it makes the code less easy to reason about for no gain (unless I've misunderstood something about part 1).

Copy link
Contributor Author

@WeichenXu123 WeichenXu123 Nov 4, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@holdenk Oh, sorry for confusing you. Yes, if set collectSubModelsParam the memory cost will always be $(estimatorParamMaps).length * sizeof(model). According to your suggestion, we have to duplicate code logic (but if i am wrong correct me):

  • When set collectSubModelsParam, we cannot pipeline modelFutures and foldMetricFutures, we should execute modelFutures and collect results first, and modify foldMetricFutures logic, change it into something like following:
val foldMetricFutures = modelResults.zip(epm).map { case (model, paramMap) =>
       Future[Double] {
          val metric = eval.evaluate(model.transform(validationDataset, paramMap))
          logDebug(s"Got metric $metric for model trained with $paramMap.")
          metric
      } (executionContext)
  • When not set collectSubModelsParam, just keep current modelFutures & foldMetricFutures and pipeline them to execute. (Only pipeline them we can save memory cost to numParallelism * sizeof(model).

So, according to your suggestion, it seems need more code. So do you still prefer this way ? Or do you have better way to implement that ?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry I didn't follow up on this before. I think that @WeichenXu123 's argument is valid, but please say if there are issues I'm missing @holdenk

Some(Array.fill($(numFolds))(Array.fill[Model[_]](epm.length)(null)))
} else None

// Compute metrics for each model over each split
val splits = MLUtils.kFold(dataset.toDF.rdd, $(numFolds), $(seed))
val metrics = splits.zipWithIndex.map { case ((training, validation), splitIndex) =>
Expand All @@ -125,10 +136,14 @@ class CrossValidator @Since("1.2.0") (@Since("1.4.0") override val uid: String)
logDebug(s"Train split $splitIndex with multiple sets of parameters.")

// Fit models in a Future for training in parallel
val modelFutures = epm.map { paramMap =>
val modelFutures = epm.zipWithIndex.map { case (paramMap, paramIndex) =>
Future[Model[_]] {
val model = est.fit(trainingDataset, paramMap)
model.asInstanceOf[Model[_]]
val model = est.fit(trainingDataset, paramMap).asInstanceOf[Model[_]]

if (collectSubModelsParam) {
subModels.get(splitIndex)(paramIndex) = model
}
model
} (executionContext)
}

Expand Down Expand Up @@ -160,7 +175,8 @@ class CrossValidator @Since("1.2.0") (@Since("1.4.0") override val uid: String)
logInfo(s"Best cross-validation metric: $bestMetric.")
val bestModel = est.fit(dataset, epm(bestIndex)).asInstanceOf[Model[_]]
instr.logSuccess(bestModel)
copyValues(new CrossValidatorModel(uid, bestModel, metrics).setParent(this))
copyValues(new CrossValidatorModel(uid, bestModel, metrics)
.setSubModels(subModels).setParent(this))
}

@Since("1.4.0")
Expand Down Expand Up @@ -244,6 +260,21 @@ class CrossValidatorModel private[ml] (
this(uid, bestModel, avgMetrics.asScala.toArray)
}

private var _subModels: Option[Array[Array[Model[_]]]] = None

@Since("2.3.0")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Only use Since annotations for public APIs

private[tuning] def setSubModels(subModels: Option[Array[Array[Model[_]]]])
: CrossValidatorModel = {
_subModels = subModels
this
}

@Since("2.3.0")
def subModels: Array[Array[Model[_]]] = _subModels.get
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's add Scala doc. We'll need to explain what the inner and outer array are and which one corresponds to the ordering of estimatorParamsMaps.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, can you please add a better Exception message? If submodels are not available, then we should tell users to set the collectSubModels Param before fitting.


@Since("2.3.0")
def hasSubModels: Boolean = _subModels.isDefined

@Since("2.0.0")
override def transform(dataset: Dataset[_]): DataFrame = {
transformSchema(dataset.schema, logging = true)
Expand All @@ -260,7 +291,8 @@ class CrossValidatorModel private[ml] (
val copied = new CrossValidatorModel(
uid,
bestModel.copy(extra).asInstanceOf[Model[_]],
avgMetrics.clone())
avgMetrics.clone()
).setSubModels(CrossValidatorModel.copySubModels(_subModels))
copyValues(copied, extra).setParent(parent)
}

Expand All @@ -271,6 +303,20 @@ class CrossValidatorModel private[ml] (
@Since("1.6.0")
object CrossValidatorModel extends MLReadable[CrossValidatorModel] {

private[CrossValidatorModel] def copySubModels(subModels: Option[Array[Array[Model[_]]]]) = {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

style: state return value explicitly

subModels.map { subModels =>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can this be simplified using map?

subModels.map(_.map(_.map(_.copy(...).asInstanceOf[...])))

val numFolds = subModels.length
val numParamMaps = subModels(0).length
val copiedSubModels = Array.fill(numFolds)(Array.fill[Model[_]](numParamMaps)(null))
for (i <- 0 until numFolds) {
for (j <- 0 until numParamMaps) {
copiedSubModels(i)(j) = subModels(i)(j).copy(ParamMap.empty).asInstanceOf[Model[_]]
}
}
copiedSubModels
}
}

@Since("1.6.0")
override def read: MLReader[CrossValidatorModel] = new CrossValidatorModelReader

Expand All @@ -282,12 +328,40 @@ object CrossValidatorModel extends MLReadable[CrossValidatorModel] {

ValidatorParams.validateParams(instance)

protected var shouldPersistSubModels: Boolean = if (instance.hasSubModels) true else false

/**
* Extra options for CrossValidatorModelWriter, current support "persistSubModels".
* if sub models exsit, the default value for option "persistSubModels" is "true".
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

typo: exsit -> exist

*/
@Since("2.3.0")
override def option(key: String, value: String): this.type = {
key.toLowerCase(Locale.ROOT) match {
case "persistsubmodels" => shouldPersistSubModels = value.toBoolean
case _ => throw new IllegalArgumentException(
s"Illegal option ${key} for CrossValidatorModelWriter")
}
this
}

override protected def saveImpl(path: String): Unit = {
import org.json4s.JsonDSL._
val extraMetadata = "avgMetrics" -> instance.avgMetrics.toSeq
val extraMetadata = ("avgMetrics" -> instance.avgMetrics.toSeq) ~
("shouldPersistSubModels" -> shouldPersistSubModels)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's have 1 name for this argument: "persistSubModels"

ValidatorParams.saveImpl(path, instance, sc, Some(extraMetadata))
val bestModelPath = new Path(path, "bestModel").toString
instance.bestModel.asInstanceOf[MLWritable].save(bestModelPath)
if (shouldPersistSubModels) {
require(instance.hasSubModels, "Cannot get sub models to persist.")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This error message may be unclear. How about adding: "When persisting tuning models, you can only set persistSubModels to true if the tuning was done with collectSubModels set to true. To save the sub-models, try rerunning fitting with collectSubModels set to true."

val subModelsPath = new Path(path, "subModels")
for (splitIndex <- 0 until instance.getNumFolds) {
val splitPath = new Path(subModelsPath, splitIndex.toString)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about naming this with the string "fold":
splitIndex.toString --> "fold" + splitIndex.toString?

for (paramIndex <- 0 until instance.getEstimatorParamMaps.length) {
val modelPath = new Path(splitPath, paramIndex.toString).toString
instance.subModels(splitIndex)(paramIndex).asInstanceOf[MLWritable].save(modelPath)
}
}
}
}
}

Expand All @@ -301,11 +375,29 @@ object CrossValidatorModel extends MLReadable[CrossValidatorModel] {

val (metadata, estimator, evaluator, estimatorParamMaps) =
ValidatorParams.loadImpl(path, sc, className)
val numFolds = (metadata.params \ "numFolds").extract[Int]
val bestModelPath = new Path(path, "bestModel").toString
val bestModel = DefaultParamsReader.loadParamsInstance[Model[_]](bestModelPath, sc)
val avgMetrics = (metadata.metadata \ "avgMetrics").extract[Seq[Double]].toArray
val shouldPersistSubModels = (metadata.metadata \ "shouldPersistSubModels").extract[Boolean]

val subModels: Option[Array[Array[Model[_]]]] = if (shouldPersistSubModels) {
val subModelsPath = new Path(path, "subModels")
val _subModels = Array.fill(numFolds)(Array.fill[Model[_]](
estimatorParamMaps.length)(null))
for (splitIndex <- 0 until numFolds) {
val splitPath = new Path(subModelsPath, splitIndex.toString)
for (paramIndex <- 0 until estimatorParamMaps.length) {
val modelPath = new Path(splitPath, paramIndex.toString).toString
_subModels(splitIndex)(paramIndex) =
DefaultParamsReader.loadParamsInstance(modelPath, sc)
}
}
Some(_subModels)
} else None

val model = new CrossValidatorModel(metadata.uid, bestModel, avgMetrics)
.setSubModels(subModels)
model.set(model.estimator, estimator)
.set(model.evaluator, evaluator)
.set(model.estimatorParamMaps, estimatorParamMaps)
Expand Down
Loading