Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
5650e98
Changed CrossValidator and TrainValidationSplit fit methods to evalua…
BryanCutler Jan 31, 2017
36a1a68
made closure vars more explicit, moved param default to trait
BryanCutler Feb 14, 2017
b051afa
added paramvalidator for numParallelEval to ensure >=1
BryanCutler Feb 14, 2017
46fe252
added test cases for CrossValidation and TrainValidationSplit
BryanCutler Feb 15, 2017
1274ba4
added numParallelEval param usage to examples
BryanCutler Feb 15, 2017
80ac2fd
added documentation to ml-tuning
BryanCutler Feb 16, 2017
8126710
changed sliding window limit to use a semaphore instead to prevent wa…
BryanCutler Feb 16, 2017
6a9b735
added note about parallelism capped by Scala collection thread pool, …
BryanCutler Feb 16, 2017
1c2e391
reworked to use ExecutorService and Futures
BryanCutler Feb 28, 2017
9e055cd
fixed wildcard import
BryanCutler Feb 28, 2017
97ad7b4
made doc changes
BryanCutler Apr 11, 2017
5e8a086
changed ExecutorService factory to a trait to be compatible with Java
BryanCutler Apr 12, 2017
864c99c
Merge remote-tracking branch 'upstream/master' into parallel-model-ev…
BryanCutler Jun 13, 2017
ad8a870
Changed ExecutorService to be set explicitly instead of factory
BryanCutler Jun 14, 2017
911af1d
added HasParallelism trait
BryanCutler Aug 23, 2017
658aacb
Updated to use Trait HasParallelsim
BryanCutler Aug 23, 2017
2c73b0b
fixed up docs
BryanCutler Aug 23, 2017
7a8221b
removed blas calculation for CrossValidator metric calc, was not nece…
BryanCutler Sep 5, 2017
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
Changed CrossValidator and TrainValidationSplit fit methods to evalua…
…te models in paralell
  • Loading branch information
BryanCutler committed Jan 31, 2017
commit 5650e98a580544303dc1185568be992d9304707a
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ private[ml] trait CrossValidatorParams extends ValidatorParams {
/** @group getParam */
def getNumFolds: Int = $(numFolds)

setDefault(numFolds -> 3)
setDefault(numFolds -> 3, numParallelEval -> 1)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

May make more sense to put the setDefault call in the parent trait ValidatorParams

}

/**
Expand Down Expand Up @@ -91,6 +91,10 @@ class CrossValidator @Since("1.2.0") (@Since("1.4.0") override val uid: String)
@Since("2.0.0")
def setSeed(value: Long): this.type = set(seed, value)

/** @group setParam */
@Since("2.2.0")
def setNumParallelEval(value: Int): this.type = set(numParallelEval, value)

@Since("2.0.0")
override def fit(dataset: Dataset[_]): CrossValidatorModel = {
val schema = dataset.schema
Expand All @@ -100,31 +104,44 @@ class CrossValidator @Since("1.2.0") (@Since("1.4.0") override val uid: String)
val eval = $(evaluator)
val epm = $(estimatorParamMaps)
val numModels = epm.length
val metrics = new Array[Double](epm.length)
val numPar = $(numParallelEval)

val instr = Instrumentation.create(this, dataset)
instr.logParams(numFolds, seed)
logTuningParams(instr)

// Compute metrics for each model over each fold
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

technically it's each estimator rather than model - since both the fitting (estimator or pipeline) and evaluation (model or pipeline model) is in parallel.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, I was just referencing the general process that a metric is computed by evaluating the model predictions on the validation set, and that this is done at each fold. I don't think that has any bearing to the concurrency here, but I added this just because the process is a little more complicated with the addition of a sliding window...

logDebug(s"Running cross-validation with level of parallelism: $numPar.")
val splits = MLUtils.kFold(dataset.toDF.rdd, $(numFolds), $(seed))
splits.zipWithIndex.foreach { case ((training, validation), splitIndex) =>
val metrics = splits.zipWithIndex.map { case ((training, validation), splitIndex) =>
val trainingDataset = sparkSession.createDataFrame(training, schema).cache()
val validationDataset = sparkSession.createDataFrame(validation, schema).cache()
// multi-model training
logDebug(s"Train split $splitIndex with multiple sets of parameters.")
val models = est.fit(trainingDataset, epm).asInstanceOf[Seq[Model[_]]]

// Fit models concurrently, limited by using a sliding window over models
val models = epm.grouped(numPar).map { win =>
win.par.map(est.fit(trainingDataset, _))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd prefer to be a bit more verbose here and use a closure variable like paramMap (or epm to follow existing impl) instead of _ just be clear & more readable.

}.toList.flatten.asInstanceOf[Seq[Model[_]]]
trainingDataset.unpersist()
var i = 0
while (i < numModels) {
// TODO: duplicate evaluator to take extra params from input
val metric = eval.evaluate(models(i).transform(validationDataset, epm(i)))
logDebug(s"Got metric $metric for model trained with ${epm(i)}.")
metrics(i) += metric
i += 1
}

// Evaluate models concurrently, limited by using a sliding window over models
val foldMetrics = models.zip(epm).grouped(numPar).map { win =>
win.par.map { m =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd like to use a case here to be more clear, so .map { case (model, paramMap) => or similar.

// TODO: duplicate evaluator to take extra params from input
val metric = eval.evaluate(m._1.transform(validationDataset, m._2))
logDebug(s"Got metric $metric for model trained with ${m._2}.")
metric
}
}.toList.flatten

validationDataset.unpersist()
}
foldMetrics
}.reduce((mA, mB) => mA.zip(mB).map(m => m._1 + m._2)).toArray

// Calculate average metric for all folds
f2jBLAS.dscal(numModels, 1.0 / $(numFolds), metrics, 1)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't like this line code because it use low-level api which make the code difficult to read.
And here is not bottleneck.
So I think we can simply use:

val metrics = ...
   ....
   .transpose.map(_.sum / $(numFolds))

instead.
What do you think about it ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I must say I wondered why the author of this bothered with a BLAS call to do the computation... even if there were a few 1000s param combinations it would still not be significant overhead. +1

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I agree. I'll go ahead and make that change here.


logInfo(s"Average cross-validation metrics: ${metrics.toSeq}")
val (bestMetric, bestIndex) =
if (eval.isLargerBetter) metrics.zipWithIndex.maxBy(_._1)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ private[ml] trait TrainValidationSplitParams extends ValidatorParams {
/** @group getParam */
def getTrainRatio: Double = $(trainRatio)

setDefault(trainRatio -> 0.75)
setDefault(trainRatio -> 0.75, numParallelEval -> 1)
}

/**
Expand Down Expand Up @@ -87,15 +87,19 @@ class TrainValidationSplit @Since("1.5.0") (@Since("1.5.0") override val uid: St
@Since("2.0.0")
def setSeed(value: Long): this.type = set(seed, value)

/** @group setParam */
@Since("2.2.0")
def setNumParallelEval(value: Int): this.type = set(numParallelEval, value)

@Since("2.0.0")
override def fit(dataset: Dataset[_]): TrainValidationSplitModel = {
val schema = dataset.schema
transformSchema(schema, logging = true)
val est = $(estimator)
val eval = $(evaluator)
val epm = $(estimatorParamMaps)
val numModels = epm.length
val metrics = new Array[Double](epm.length)
val numPar = $(numParallelEval)
logDebug(s"Running validation with level of parallelism: $numPar.")

val instr = Instrumentation.create(this, dataset)
instr.logParams(trainRatio, seed)
Expand All @@ -106,18 +110,21 @@ class TrainValidationSplit @Since("1.5.0") (@Since("1.5.0") override val uid: St
trainingDataset.cache()
validationDataset.cache()

// multi-model training
logDebug(s"Train split with multiple sets of parameters.")
val models = est.fit(trainingDataset, epm).asInstanceOf[Seq[Model[_]]]
// Fit models concurrently, limited by using a sliding window over models
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

perhaps add "sliding window of size 'numPar' over ..." - can add to all instances of this comment

val models = epm.grouped(numPar).map { win =>
win.par.map(est.fit(trainingDataset, _))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same comment as for cross-val applies here

}.toList.flatten.asInstanceOf[Seq[Model[_]]]
trainingDataset.unpersist()
var i = 0
while (i < numModels) {
// TODO: duplicate evaluator to take extra params from input
val metric = eval.evaluate(models(i).transform(validationDataset, epm(i)))
logDebug(s"Got metric $metric for model trained with ${epm(i)}.")
metrics(i) += metric
i += 1
}
// Evaluate models concurrently, limited by using a sliding window over models
val metrics = models.zip(epm).grouped(numPar).map { win =>
win.par.map { m =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same comment as for cross-val applies here

// TODO: duplicate evaluator to take extra params from input
val metric = eval.evaluate(m._1.transform(validationDataset, m._2))
logDebug(s"Got metric $metric for model trained with ${m._2}.")
metric
}
}.toList.flatten.toArray
validationDataset.unpersist()

logInfo(s"Train validation split metrics: ${metrics.toSeq}")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import org.json4s.jackson.JsonMethods._
import org.apache.spark.SparkContext
import org.apache.spark.ml.{Estimator, Model}
import org.apache.spark.ml.evaluation.Evaluator
import org.apache.spark.ml.param.{Param, ParamMap, ParamPair, Params}
import org.apache.spark.ml.param.{IntParam, Param, ParamMap, ParamPair, Params}
import org.apache.spark.ml.param.shared.HasSeed
import org.apache.spark.ml.util._
import org.apache.spark.ml.util.DefaultParamsReader.Metadata
Expand Down Expand Up @@ -67,6 +67,17 @@ private[ml] trait ValidatorParams extends HasSeed with Params {
/** @group getParam */
def getEvaluator: Evaluator = $(evaluator)

/**
* param to control the number of models evaluated in parallel
*
* @group param
*/
val numParallelEval: IntParam = new IntParam(this, "numParallelEval",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps set this as group expertParam for now

"max number of models to evaluate in parallel, 1 for serial evaluation")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add note about default value being 1


/** @group getParam */
def getNumParallelEval: Int = $(numParallelEval)

protected def transformSchemaImpl(schema: StructType): StructType = {
require($(estimatorParamMaps).nonEmpty, s"Validator requires non-empty estimatorParamMaps")
val firstEstimatorParamMap = $(estimatorParamMaps).head
Expand Down