Skip to content
Closed
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.spark.ml.tuning

import java.util.{List => JList, Locale}
import java.util.concurrent.atomic.AtomicInteger
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not needed anymore


import scala.collection.JavaConverters._
import scala.concurrent.Future
Expand Down Expand Up @@ -146,25 +147,18 @@ class CrossValidator @Since("1.2.0") (@Since("1.4.0") override val uid: String)
val validationDataset = sparkSession.createDataFrame(validation, schema).cache()
logDebug(s"Train split $splitIndex with multiple sets of parameters.")

val completeFitCount = new AtomicInteger(0)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My understanding of Scala futures may be off here, but this seems to change the behavior to me. Now, the unpersist operation will happen in one of the training threads, instead of asynchronously in its own thread. I'm not sure how much of an effect that will have.

Why can't you just put all the logic in one map statement like below:

      // Fit models in a Future for training in parallel
      val modelFutures = epm.zipWithIndex.map { case (paramMap, paramIndex) =>
        Future[Model[_]] {
          val model = est.fit(trainingDataset, paramMap).asInstanceOf[Model[_]]

          if (collectSubModelsParam) {
            subModels.get(splitIndex)(paramIndex) = model
          }
          // TODO: duplicate evaluator to take extra params from input
          val metric = eval.evaluate(model.transform(validationDataset, paramMap))
          logDebug(s"Got metric $metric for model trained with $paramMap.")
          metric
        } (executionContext)
      }

      // Unpersist training data only when all models have trained
      Future.sequence[Model[_], Iterable](modelFutures)(implicitly, executionContext)
        .onComplete { _ => trainingDataset.unpersist() } (executionContext)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We hope to unpersist training dataset once all fitting finished. But your idea here will postpone the unpersist time until all fitting & evaluation done. and your code should have the same effect with:

val modelFutures = ...
val foldMetrics = modelFutures.map(ThreadUtils.awaitResult(_, Duration.Inf))
trainingDataset.unpersist()
validationDataset.unpersist()

and, about what you said:

"Now, the unpersist operation will happen in one of the training threads, instead of asynchronously in its own thread. "

What's the possible effect or impact ? trainingDataset.unpersist() itself is a async method and won't block. So will it have some effect ? I think it can be put in any thread safely.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can also turn a sequence of futures into a future then map on that single future and do the unpersist there.

Copy link
Contributor Author

@WeichenXu123 WeichenXu123 Dec 12, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@holdenk Yes, that's what's done in current master code, but, if in this way, the future have to be split into a modelFuture and foldMetricFuture, so it cannot avoid the issue that: modelFuture still holds the model computed (after future completed) which cause the memory issue.

Copy link
Contributor

@MrBago MrBago Dec 12, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can use futures to do this, you need to use a var for modelFutures, then map on those futures to Unit, then collect those into a sequence, then map on that to unpersist, and also set modelFutures to null to release those references, but why go to the trouble. What's the concern with doing it in the final training thread. Why is this a change in behavior?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@MrBago About what your said:

You can use futures to do this, you need to use a var for modelFutures, then map on those futures to Unit, then collect those into a sequence, then map on that to unpersist, and also set modelFutures to null to release those references

Can you post some pseudo code so I can check whether it works fine and its peak memory occupation.
Thanks!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this might work, but I think what you have in the PR now is better.

      // Fit models in a Future for training in parallel
      var modelFutures = epm.map { paramMap =>
        Future[Model[_]] {
          val model = est.fit(trainingDataset, paramMap)
          model.asInstanceOf[Model[_]]
        } (executionContext)
      }

      // Unpersist training data only when all models have trained
      val unitFutures = modelFutures.map{ _.map{ _ => () } (executionContext) }
      Future.sequence[Unit, Iterable](unitFutures)(implicitly, executionContext)
        .onComplete { _ => trainingDataset.unpersist() } (executionContext)

      // Evaluate models in a Future that will calulate a metric and allow model to be cleaned up
      val foldMetricFutures = modelFutures.zip(epm).map { case (modelFuture, paramMap) =>
        modelFuture.map { model =>
          // TODO: duplicate evaluator to take extra params from input
          val metric = eval.evaluate(model.transform(validationDataset, paramMap))
          logDebug(s"Got metric $metric for model trained with $paramMap.")
          metric
        } (executionContext)
      }
      modelFutures = null

// Fit models in a Future for training in parallel
val modelFutures = epm.zipWithIndex.map { case (paramMap, paramIndex) =>
Future[Model[_]] {
val foldMetricFutures = epm.zipWithIndex.map { case (paramMap, paramIndex) =>
Future[Double] {
val model = est.fit(trainingDataset, paramMap).asInstanceOf[Model[_]]
if (completeFitCount.incrementAndGet() == epm.length) {
trainingDataset.unpersist()
}

if (collectSubModelsParam) {
subModels.get(splitIndex)(paramIndex) = model
}
model
} (executionContext)
}

// Unpersist training data only when all models have trained
Future.sequence[Model[_], Iterable](modelFutures)(implicitly, executionContext)
.onComplete { _ => trainingDataset.unpersist() } (executionContext)

// Evaluate models in a Future that will calulate a metric and allow model to be cleaned up
val foldMetricFutures = modelFutures.zip(epm).map { case (modelFuture, paramMap) =>
modelFuture.map { model =>
// TODO: duplicate evaluator to take extra params from input
val metric = eval.evaluate(model.transform(validationDataset, paramMap))
logDebug(s"Got metric $metric for model trained with $paramMap.")
Expand Down