-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-21027][ML][PYTHON] Added tunable parallelism to one vs. rest in both Scala mllib and Pyspark #18281
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
…st classification. Added a parallelism parameter to the scala implementation of one vs. rest for python persistence but have not yet used it to tune the scala parallelism implementation.
|
add to whitelist |
|
Test build #77948 has finished for PR 18281 at commit
|
|
Test build #77950 has finished for PR 18281 at commit
|
|
Test build #77951 has finished for PR 18281 at commit
|
…n of the one vs. rest algorithm.
|
Test build #77954 has finished for PR 18281 at commit
|
|
taking a look now |
jkbradley
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the PR! My comments are mostly about the unit tests.
Also, since SPARK-21028 has been closed, can you please remove it from the PR title?
| def getClassifier: ClassifierType = $(classifier) | ||
|
|
||
| val parallelism = new IntParam(this, "parallelism", | ||
| "parallelism parameter for tuning amount of parallelism", ParamValidators.gtEq(1)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is not very informative. Can you please make it more explicit?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also
- add Scala doc
- add Since annotation
- Mark this as an expertParam (the "group" annotation in the Scala doc); see other parts of the code for examples
| set(classifier, value.asInstanceOf[ClassifierType]) | ||
| } | ||
|
|
||
| /** @group setParam */ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
next release will be 2.3
| assert(ova.getPredictionCol === "prediction") | ||
| val ovaModel = ova.fit(dataset) | ||
|
|
||
| MLTestingUtils.checkCopyAndUids(ova, ovaModel) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a generic test which only needs to be done in 1 test for each algorithm. You can remove it here.
| val ova = new OneVsRest() | ||
| .setClassifier(new LogisticRegression) | ||
| .setParallelism(8) | ||
| assert(ova.getLabelCol === "label") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No need to check Params. That's not what this unit test is for.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There are several things like that below; just go through and remove items which are not part of this unit test.
| assert(expectedMetrics.confusionMatrix ~== ovaMetrics.confusionMatrix absTol 400) | ||
| } | ||
|
|
||
| test("one-vs-rest: tuning parallelism produces correct output") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"produces correct output" --> "does not affect output"
This test appears to check OVR vs. another algorithm. I think a more precise test would check that tuning parallelism still produces exactly the same models. Could you please update it to do so?
python/pyspark/ml/classification.py
Outdated
|
|
||
| return self._copyValues(OneVsRestModel(models=models)) | ||
|
|
||
| def setParallelism(self, value): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add Since annotations
python/pyspark/ml/classification.py
Outdated
|
|
||
| # TODO: Parallel training for all classes. | ||
| models = [trainSingleClass(i) for i in range(numClasses)] | ||
| pool = ThreadPool(processes=self.getParallelism()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One new thought: It'd be good to set processes to min(parallelism, numClasses). Same in Scala.
python/pyspark/ml/tests.py
Outdated
|
|
||
| class ParOneVsRestTests(SparkSessionTestCase): | ||
|
|
||
| def test_copy(self): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is this testing?
python/pyspark/ml/tests.py
Outdated
| model1 = model.copy({model.predictionCol: "indexed"}) | ||
| self.assertEqual(model1.getPredictionCol(), "indexed") | ||
|
|
||
| def test_output_columns(self): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ditto: is this needed?
How about adding a test like the one I proposed for Scala, which makes sure the same model is learned regardless of parallelism?
| multiclassLabeled.persist(StorageLevel.MEMORY_AND_DISK) | ||
| } | ||
|
|
||
| val iters = Range(0, numClasses).par |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
CC @thunterdb just to double-check
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@jkbradley thanks for calling this out. Indeed, the code as it stands may cause some non-deterministic issues in complex environments. I put a comment about that in that PR:
#16774
See how it is done in this file:
https://github.com/apache/spark/pull/16774/files#diff-d14539cadce0fba9d8b7d970adaf8b26
It should be a quick change.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Discussed offline with @thunterdb : Setting the parallel collection tasksupport value is essentially doing the same thing as in @BryanCutler 's PR: ForkJoinTaskSupport is using the ExecutionContext created by ForkJoinPool under the hood.
…ts for testing that parallelism doesn't affect the output.
|
Test build #78004 has started for PR 18281 at commit |
thunterdb
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@ajaysaini725 using scala's parallel collections is not going to work well in that context, see my comment. It is easy to fix by looking at the linked file.
| multiclassLabeled.persist(StorageLevel.MEMORY_AND_DISK) | ||
| } | ||
|
|
||
| val iters = Range(0, numClasses).par |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@jkbradley thanks for calling this out. Indeed, the code as it stands may cause some non-deterministic issues in complex environments. I put a comment about that in that PR:
#16774
See how it is done in this file:
https://github.com/apache/spark/pull/16774/files#diff-d14539cadce0fba9d8b7d970adaf8b26
It should be a quick change.
|
Thanks @thunterdb , I was just about to comment also on the similarities to #16774. It would probably be best to align a couple things between the two:
What are your guys thoughts on these? |
|
test this please |
|
Test build #78009 has finished for PR 18281 at commit
|
|
I agree; it'd be good to match on the Param name. Do you think "parallelism" is too vague? If not, then I like it since it's simple. I'd vote for default parallelism of 1 to be safe (match current CrossValidator behavior). We can change this PR to use 1 as well for uniformity. Do we really need to expose the ability to pass in an ExecutorService? Given how few people have requested any kind of parallelism, I have a hard time imagining people needing to pass in an ExecutorService. If one of you has a need for it, I guess that's fine, but I'd recommend it be marked as a DeveloperApi and clearly documented (e.g., not saved by persistence, etc.). |
|
Originally in the CV pr Bryan used par collections. Tim pointed out perhaps
futures would be better and to add the ability to use different custom
threadpools. However I think the intention was same as here - i.e. To just
ensure the threadpool is a new dedicated one and not the default in order
not to block multi tenant apps.
I tend to prefer the par collection approach for simplicity and
readability. Also if we can share the param then great - "parallelism"
works I think and is generic enough to be used by multiple components.
Also perhaps this wouldn't work with Scala 2.10? I think that may be ok now
since we intend to drop support in 2.3.x ?
If we agree on approach then the CV PR can be changed back to using par
collections with custom task support.
…On Wed, 14 Jun 2017 at 03:36, jkbradley ***@***.***> wrote:
***@***.**** commented on this pull request.
------------------------------
In mllib/src/main/scala/org/apache/spark/ml/classification/OneVsRest.scala
<#18281 (comment)>:
> @@ -325,8 +343,13 @@ final class OneVsRest @SInCE("1.4.0") (
multiclassLabeled.persist(StorageLevel.MEMORY_AND_DISK)
}
+ val iters = Range(0, numClasses).par
Discussed offline with @thunterdb <https://github.com/thunterdb> :
Setting the parallel collection tasksupport value is essentially doing the
same thing as in @BryanCutler <https://github.com/bryancutler> 's PR:
ForkJoinTaskSupport is using the ExecutionContext created by ForkJoinPool
under the hood.
—
You are receiving this because you were mentioned.
Reply to this email directly, view it on GitHub
<#18281 (comment)>, or mute
the thread
<https://github.com/notifications/unsubscribe-auth/AA_SB35eWKaGmfDIF7gnqVwqrFnwDArMks5sDzkqgaJpZM4N3uuq>
.
|
|
You're right about Scala being an issue. This actually works with Scala 2.10 and 2.11 but not 2.12, in which Scala drops its own ForkJoinPool in favor of the java one. As long as we drop 2.10 before adding 2.12, then we'll be OK. @BryanCutler do par collections seem workable? |
|
One comment about putting parallelism in a trait vs. not: Would we want to avoid creating a "threadpool" when parallelism = 1? In that (common) case, maybe we'd want to avoid par collections. If so, then the trait could provide a helper method which takes a range/seq and a function accepting values from that range. That helper could use par collections or not, depending on the setting of parallelism. Good, or too complex? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for working on this PR, looks good overall. I think we just need to agree on the best approach for this kind of thing.
| val iters = Range(0, numClasses).par | ||
| iters.tasksupport = new ForkJoinTaskSupport( | ||
| new ForkJoinPool(Math.min(getParallelism, numClasses)) | ||
| ) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think ForkJoinPool is the best thing to use here. It's more geared for a bunch of small tasks that might spawn other tasks. Just a regular thread-pool is fine. Also, it is a little better for the case of parallelism 1 to use sameThreadExecutor. This will just run in the current thread and has no overhead (equivalent to running in serial), where a thread-pool of size 1 will still create another thread and does have some overhead.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That sounds like a good way to implement my suggestion above. +1
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually, quick question: Where are docs for sameThreadExecutor? ...having trouble finding API docs on Google.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's already in Spark as ThreadUtils.sameThread
| // create k columns, one for each binary classifier. | ||
| val models = Range(0, numClasses).par.map { index => | ||
| val models = iters.map { index => | ||
| // generate new label metadata for the binary problem. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Using Futures is a simple change and imho is clearer than using parallel collections once you start messing around with the task support. It would just require changing a couple lines to something like this
...
Future {classifier.fit(trainingDataset, paramMap)} (executionContext)
}.map(ThreadUtils.awaitResult(_, Duration.Inf)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What do Futures add? They require more code, and the functionality seems to be the same for the purposes of OneVsRest.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
They don't necessarily add anything to here, but they are a more standard way of parallelism in Spark over using TaskSupport and it's more flexible for setting an ExecutorService. I'm not sure if you can set TaskSupport to sameThreadExecutor or what really happens behind the scenes if you make a ThreadPoolTaskSupport with 1 thread.
|
|
||
| val metricsPar2 = new MulticlassMetrics(ovaResultsPar2) | ||
| val metricsPar4 = new MulticlassMetrics(ovaResultsPar4) | ||
| assert(metricsPar2.confusionMatrix ~== metricsPar4.confusionMatrix absTol 400) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this test would be better if you just compared single threaded vs multi
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1
|
@BryanCutler Thanks for the thoughts! I didn't see a response w.r.t. putting parallelism in a trait, so I'll say we won't do it for now. The usage of par collections / Futures in OneVsRest vs tuning looks a bit different. |
I think this would be a goot idea and it could be shared with
It is a little different. For CrossValidator, Futures are nice because I could add a callback to unpersist training data once all training is complete and can continue to evaluate each model as soon as it is ready instead of blocking. |
|
Catching up here, it sounds like the current recommendations (which I'm on board with) are to:
|
|
Test build #79985 has finished for PR 18281 at commit
|
|
@holdenk Some of those improvements on handling parallelism sounds useful, but I'd prefer we merge this and then add more improvements. This PR should be a strict improvement there (moving from no parallelism to some potential for parallelism). |
| def getParallelism: Int = $(parallelism) | ||
|
|
||
| /** @group expertSetParam */ | ||
| def setParallelism(value: Int): this.type = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can remove this now that it is in OneVsRest
| def setPredictionCol(value: String): this.type = set(predictionCol, value) | ||
|
|
||
| /** @group expertGetParam */ | ||
| override def getParallelism: Int = $(parallelism) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This one can just go in the trait right?
| classifier=None, parallelism=1): | ||
| """ | ||
| setParams(self, featuresCol=None, labelCol=None, predictionCol=None, classifier=None): | ||
| setParams(self, featuresCol=None, labelCol=None, predictionCol=None, \ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The default args here in the doc should match the method (for featuresCol, labelCol and predictionCol)
|
|
||
| ovaModelPar1.models.zip(ovaModelPar2.models).foreach { | ||
| case (lrModel1: LogisticRegressionModel, lrModel2: LogisticRegressionModel) => | ||
| assert(lrModel1.coefficients === lrModel2.coefficients) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Perhaps we should use the approx equal version for vectors and matrices here and above? It seems the test does pass, but perhaps that would be better, to avoid future flakiness for whatever reason. Also, we do so in the Python tests so it would be more consistent.
|
@ajaysaini725 Could you resolve merge conflicts and address the remaining outstanding review comments? I left a few minor additional comments. Overall I think this is just about ready. |
|
I also think we can leave any potential improvements for parallelism on the Python side (as well as the test side if we come up with a good way of testing that fitting is actually being done in parallel) for a later PR. |
| None, "TypeConverters.toString"), | ||
| ("aggregationDepth", "suggested depth for treeAggregate (>= 2).", "2", | ||
| "TypeConverters.toInt"), | ||
| ("parallelism", "number of threads to use when fitting models in parallel.", "1", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should probably indicate that the value range is "(>= 1)" at the end of the description.
|
Test build #80554 has finished for PR 18281 at commit
|
9328117 to
585a3f8
Compare
|
Test build #80560 has finished for PR 18281 at commit
|
|
@ajaysaini725 can you fix the outstanding issues here? I would like to make use of the |
|
By the way we should note in the migration guide that this is a behavior change on the Scala side - currently |
|
Also to be clear, I'm fine with the changes I've suggested being left for a follow up (but if we do go ahead and merge this without those changes lets make it an explicit follow up task). |
| import org.apache.spark.ml.attribute._ | ||
| import org.apache.spark.ml.linalg.Vector | ||
| import org.apache.spark.ml.param.{Param, ParamMap, ParamPair, Params} | ||
| <<<<<<< HEAD |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
some sort of merge problem? this shouldn't be in a commit
|
Test build #81043 has finished for PR 18281 at commit
|
|
Test build #81045 has finished for PR 18281 at commit
|
|
Test build #3899 has finished for PR 18281 at commit
|
|
I take this PR over in #19110 because the original author is busy but we need merge this PR soon. |
|
Thanks @ajaysaini725 for your work (and permission given offline to take this over)! We can close this issue now. |
Added tunable parallelism to the pyspark implementation of one vs. rest classification. Added a parallelism parameter to the Scala implementation of one vs. rest along with functionality for using the parameter to tune the level of parallelism.