Skip to content

Conversation

@zhengruifeng
Copy link
Contributor

@zhengruifeng zhengruifeng commented Sep 11, 2017

What changes were proposed in this pull request?

1, add param handlePersistence
2, add methods preprocess and postprocess in Predictor

How was this patch tested?

existing tests

@SparkQA
Copy link

SparkQA commented Sep 11, 2017

Test build #81623 has finished for PR 19186 at commit f8fa957.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

@zhengruifeng
Copy link
Contributor Author

Jenkins, retest this please

@SparkQA
Copy link

SparkQA commented Sep 11, 2017

Test build #81626 has finished for PR 19186 at commit f8fa957.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Contributor

@smurching smurching left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the PR!

I mainly had one comment that appeared across multiple files (we should still check that input data is unpersisted before attempting to cache it even when handlePersistence is true).

}

if (handlePersistence) instances.persist(StorageLevel.MEMORY_AND_DISK)
if ($(handlePersistence)) instances.persist(StorageLevel.MEMORY_AND_DISK)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If $(handlePersistence) is true, we should still check that dataset is uncached (i.e. check that dataset.storageLevel == StorageLevel.NONE) before caching instances, or else we'll run into the issues described in SPARK-21799

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1. I supposed that it's up to the users to check the storageLevel to avoid double caching. But I now approve to check in the algs, and it may be better to log a warning if the dataset is already cached and the handlePersistence is set true.

// persist if underlying dataset is not persistent.
val handlePersistence = dataset.rdd.getStorageLevel == StorageLevel.NONE
if (handlePersistence) {
if ($(handlePersistence)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See comment above, we should also check that dataset.storageLevel == StorageLevel.NONE before caching newDataset

ParamDesc[Int]("aggregationDepth", "suggested depth for treeAggregate (>= 2)", Some("2"),
isValid = "ParamValidators.gtEq(2)", isExpertParam = true))
isValid = "ParamValidators.gtEq(2)", isExpertParam = true),
ParamDesc[Boolean]("handlePersistence", "whether to handle data persistence", Some("true")))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This description could be a bit clearer, how about "if true, will cache unpersisted input data before fitting estimator on it"?

lr.setMaxIter(optimizer.getNumIterations())
lr.setTol(optimizer.getConvergenceTol())
// Determine if we should cache the DF
lr.setHandlePersistence(input.getStorageLevel == StorageLevel.NONE)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

handlePersistence should be specified by the user rather than inferred by the algorithm.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

mlllib.LoR do not expose HandlePersistence to users now, and I think it maybe better to keep it.

}

if (handlePersistence) {
if ($(handlePersistence)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See comment above, we should also check that dataset.storageLevel == StorageLevel.NONE

val instances = extractWeightedLabeledPoints(dataset)
val handlePersistence = dataset.rdd.getStorageLevel == StorageLevel.NONE
if (handlePersistence) instances.persist(StorageLevel.MEMORY_AND_DISK)
if ($(handlePersistence)) instances.persist(StorageLevel.MEMORY_AND_DISK)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See comment above, we should also check that dataset.storageLevel == StorageLevel.NONE

@smurching
Copy link
Contributor

Note: This PR follows up on the work/discussions in #17014

@SparkQA
Copy link

SparkQA commented Sep 12, 2017

Test build #81650 has finished for PR 19186 at commit e112b42.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Sep 12, 2017

Test build #81651 has finished for PR 19186 at commit 9e53579.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Sep 13, 2017

Test build #81704 has finished for PR 19186 at commit e40d3a1.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • public class NettyMemoryMetrics implements MetricSet
  • class ClusteringEvaluator @Since(\"2.3.0\") (@Since(\"2.3.0\") override val uid: String)
  • case class ClusterStats(featureSum: Vector, squaredNormSum: Double, numOfPoints: Long)
  • class OneVsRest(Estimator, OneVsRestParams, HasParallelism, JavaMLReadable, JavaMLWritable):
  • class HasParallelism(Params):
  • case class InsertIntoDir(
  • case class InsertIntoDataSourceDirCommand(
  • case class DescribeColumnCommand(
  • case class InsertIntoHiveDirCommand(

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@smurching @zhengruifeng
For estimators inherit Predictor(such as LiR/LoR), the checking if (dataset.storageLevel == StorageLevel.NONE) help nothing.
Because in Predictor.fit it generate casted Dataset from input Dataset, this cause the original Dataset storageLevel to be cleared. (i.e, dataset.storageLevel will always be None, even when the input dataset is cached).
I remember this issue has been discussed in Jira & old PRs, but this PR do not resolve this issue.

Copy link
Contributor

@smurching smurching Sep 13, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oops, yeah I had forgotten about that (thanks for the catch).

One solution could be to extend HasHandlePersistence in Predictor and check handlePersistence / cache uncached data in Predictor.fit() instead of Predictor.train(). This has the drawback of limiting individual algorithms' (e.g. LiR/LoR) ability to customize their caching behavior, which might be a deal-breaker.

@jkbradley I'd be curious to hear your thoughts on this.

@zhengruifeng
Copy link
Contributor Author

@WeichenXu123 Thanks a lot for pointing it out! I also forgot about this.
@smurching Thanks for your solution, however, I think there maybe exist another drawback in it: The alg usually use only several columns in the dataset, like featuresCol/labelCol. So we should not cache the whole dataset, we should just cache the used columns.

I think the issue of handling persistence maybe somewhat complexed, and it is up to the author of algs to decide whether, when and where to persist.
In current impl of LinearRegression, if the Normal solver is chosen, then the input will not be cached. if the LBFGS solver is chosen, the input will always be cached. However, for LBFGS solver, the input may not need to be cached if the param maxIter is set 0 or 1.

I prefer this solution: add methods preprocess(dataset: Dataset[_]): DataFrame and postprocess(dataset: DataFrame) in Predictor. The two methods will be called before and after train(). In preprocess, we do casting and persist in it, while in postprocess, we unpersist the intermediate dataframe if needed. Algs can override those for specific purpose.

@SparkQA
Copy link

SparkQA commented Sep 14, 2017

Test build #81751 has finished for PR 19186 at commit aa04d4b.

  • This patch fails to build.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Sep 14, 2017

Test build #81752 has finished for PR 19186 at commit 74445cd.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Sep 14, 2017

Test build #81762 has finished for PR 19186 at commit 3f11c67.

  • This patch fails MiMa tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Sep 14, 2017

Test build #81763 has finished for PR 19186 at commit 29f38e4.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

@zhengruifeng
Copy link
Contributor Author

retest this please

@SparkQA
Copy link

SparkQA commented Sep 14, 2017

Test build #81771 has finished for PR 19186 at commit 29f38e4.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@jkbradley
Copy link
Member

@zhengruifeng Can you please update the PR description so it describes the actual functionality being added?

@jkbradley
Copy link
Member

This has ended up being more complex than we envisioned. It would be valuable to describe the design succinctly so that people can debate it on JIRA. Could you please describe your solution on the JIRA and ping people who have been discussing this? (I also made a request on the JIRA which I'd like to see addressed.) Thanks!

@WeichenXu123
Copy link
Contributor

@zhengruifeng You should write a design doc succinctly and link to jira first. After we reach agreement with the design this PR can move forward. Thanks!

@SparkQA
Copy link

SparkQA commented Sep 26, 2017

Test build #82179 has finished for PR 19186 at commit 18f9903.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

@zhengruifeng
Copy link
Contributor Author

retest this please

@SparkQA
Copy link

SparkQA commented Sep 26, 2017

Test build #82186 has finished for PR 19186 at commit 18f9903.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Sep 26, 2017

Test build #82200 has finished for PR 19186 at commit c168e7d.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@zhengruifeng zhengruifeng deleted the fix_double_cache branch August 21, 2019 06:47
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants