-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-22397][ML]add multiple columns support to QuantileDiscretizer #19715
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
Jenkins add to whitelist |
|
Test build #83685 has finished for PR 19715 at commit
|
|
Test build #83729 has finished for PR 19715 at commit
|
| @Since("1.6.0") | ||
| final class QuantileDiscretizer @Since("1.6.0") (@Since("1.6.0") override val uid: String) | ||
| extends Estimator[Bucketizer] with QuantileDiscretizerBase with DefaultParamsWritable { | ||
| extends Estimator[Bucketizer] with QuantileDiscretizerBase with DefaultParamsWritable |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It looks a bit weird to have HasInputCols and HasOutputCols directly in QuantileDiscretizer and leave other params in QuantileDiscretizerBase.
But extending HasInputCols and HasOutputCols in QuantileDiscretizerBase causes binary compatibility issue. I think we don't want to break the compatibility in the effort of adding multi-col support.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess I will leave this as is even though it's a bit weird.
| } | ||
| bucketizer.setSplitsArray(distinctSplitsArray.toArray) | ||
| copyValues(bucketizer.setParent(this)) | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
style issue:
...
} else {
...There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will fix this. And fix the same problem in another place.
| val data = (0 until 100000).map { idx => | ||
| (data1(idx), data2(idx)) | ||
| } | ||
| val df: DataFrame = data.toSeq.toDF("input1", "input2") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: No need for the explicit type DataFrame.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will remove DataFrame
| val data2 = Array.range(1, 200000, 2).map(_.toDouble) | ||
| val data = (0 until 100000).map { idx => | ||
| (data1(idx), data2(idx)) | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
val data seems just as data1.zip(data2)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes. Will change to data1.zip(data2)
| val data2 = Array(1.0, 2.0, 3.0, 1.0, 1.0, 1.0, 1.0, 3.0, 2.0, 3.0, 1.0, 2.0) | ||
| val data = (0 until data1.length).map { idx => | ||
| (data1(idx), data2(idx)) | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use data1.zip(data2)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will change to data1.zip(data2).
| val data = (0 until data1.length).map { idx => | ||
| (data1(idx), data2(idx)) | ||
| } | ||
| val df: DataFrame = data.toSeq.toDF("input1", "input2") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Remove DataFrame.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will remove DataFrame.
| val numBucketsArray: Array[Int] = Array(2, 5, 10) | ||
| val data1 = Array.range(1, 21, 1).map(_.toDouble) | ||
| val expected1 = Array (0.0, 1.0, 1.0, 2.0, 2.0, 2.0, 3.0, 4.0, 4.0, 5.0, | ||
| 5.0, 5.0, 6.0, 6.0, 7.0, 8.0, 8.0, 9.0, 9.0, 9.0) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this correct? I tried to apply the same data on current QuantileDiscretizer:
val data1 = Array.range(1, 21, 1).map(_.toDouble)
val df = data1.toSeq.toDF
val discretizer = new QuantileDiscretizer().setInputCol("value").setOutputCol("result").setNumBuckets(2)
discretizer.fit(df).transform(df).show+-----+------+
|value|result|
+-----+------+
| 1.0| 0.0|
| 2.0| 0.0|
| 3.0| 0.0|
| 4.0| 0.0|
| 5.0| 0.0|
| 6.0| 0.0|
| 7.0| 0.0|
| 8.0| 0.0|
| 9.0| 0.0|
| 10.0| 1.0|
| 11.0| 1.0|
| 12.0| 1.0|
| 13.0| 1.0|
| 14.0| 1.0|
| 15.0| 1.0|
| 16.0| 1.0|
| 17.0| 1.0|
| 18.0| 1.0|
| 19.0| 1.0|
| 20.0| 1.0|
+-----+------+
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I thought we are going to get all the probabilities derived from the numBucketsArray and use them for all the columns. In this case, all the probabilities for numBucketsArray (2,5,10) are (0.0, 0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9, 1.0). I am using these probabilities for all the input columns. In another word, I am using numsBuckets 10 for all the input columns. Is this right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think to set numBucketsArray, the first number of bucket is for first column, although we retrieve the approx-quantile for all probabilities at once.
|
It would be great if multiple columns support could be extended to the some of the other transformers as well like StringIndexer. |
|
@AFractalThought For StringIndexer, there is already SPARK-11215. |
|
Test build #83994 has finished for PR 19715 at commit
|
MLnick
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry for the delay. Made a pass and some comments.
| private[feature] def isQuantileDiscretizeMultipleColumns(): Boolean = { | ||
| if (isSet(inputCols) && isSet(inputCol)) { | ||
| logWarning("Both `inputCol` and `inputCols` are set, we ignore `inputCols` and this " + | ||
| "`QuantileDiscretize` only map one column specified by `inputCol`") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
'only map' -> 'will only map'
| private[feature] def isQuantileDiscretizeMultipleColumns(): Boolean = { | ||
| if (isSet(inputCols) && isSet(inputCol)) { | ||
| logWarning("Both `inputCol` and `inputCols` are set, we ignore `inputCols` and this " + | ||
| "`QuantileDiscretize` only map one column specified by `inputCol`") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
QuantileDiscretize -> QuantileDiscretizer
| val numBucketsArray = new IntArrayParam(this, "numBucketsArray", "Array of number of buckets " + | ||
| "(quantiles, or categories) into which data points are grouped. This is for multiple " + | ||
| "columns input. If numBucketsArray is not set but numBuckets is set, it means user wants " + | ||
| "to use the same numBuckets across all columns.") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Need a validator function here to ensure all bucket values >= 2
| */ | ||
| val numBucketsArray = new IntArrayParam(this, "numBucketsArray", "Array of number of buckets " + | ||
| "(quantiles, or categories) into which data points are grouped. This is for multiple " + | ||
| "columns input. If numBucketsArray is not set but numBuckets is set, it means user wants " + |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"If transforming multiple columns and numBucketsArray is not set, but numBuckets is set, then numBuckets will be applied across all columns."
| * categorical features. The number of bins can be set using the `numBuckets` parameter. It is | ||
| * possible that the number of buckets used will be smaller than this value, for example, if there | ||
| * are too few distinct values of the input to create enough distinct quantiles. | ||
| * Since 2.3.0, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's match the Bucketizer comment. So something like:
...
Since 2.3.0, `QuantileDiscretizer ` can map multiple columns at once by setting the `inputCols` parameter.
Note that when both the `inputCol` and `inputCols` parameters are set, a log warning will be printed and
only `inputCol` will take effect, while `inputCols` will be ignored. To specify the number of buckets
for each column , the `numBucketsArray ` parameter can be set, or if the number of buckets should be the
same across columns, `numBuckets` can be set as a convenience.
| def getNumBuckets: Int = getOrDefault(numBuckets) | ||
|
|
||
| /** | ||
| * Array of number of buckets (quantiles, or categories) into which data points are grouped. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can add a comment about "each value must be greater than or equal to 2"
| (0.0 to 1.0 by 1.0/$(numBuckets)).toArray, $(relativeError)) | ||
| val bucketizer = new Bucketizer(uid).setHandleInvalid($(handleInvalid)) | ||
| if (isQuantileDiscretizeMultipleColumns) { | ||
| var bucketArray = Array.empty[Int] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
val bucketSeq = if (isSet(numBucketsArray)) {
$(numBucketsArray).toSeq
} else {
Seq($(numBuckets))
}| val splits = dataset.stat.approxQuantile($(inputCol), | ||
| (0.0 to 1.0 by 1.0/$(numBuckets)).toArray, $(relativeError)) | ||
| val bucketizer = new Bucketizer(uid).setHandleInvalid($(handleInvalid)) | ||
| if (isQuantileDiscretizeMultipleColumns) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This section overall seems like it can be cleaned up - it should be possible to have one code path for a Seq of numBuckets and at the end if transforming only one column the splits array should be the first element.
You could check the case of a single numBuckets value and Array.fill that value (if numBucketsArray is not set).
| val model = discretizer.fit(df) | ||
| assert(model.hasParent) | ||
| } | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should add 2 tests:
- test setting
numBucketsis the same as settingnumBucketsArrayexplicitly with identical values - test that QD over multiple columns produces the same results as 2x QDs over the same columns (as we did for Bucketizer)
|
@huaxingao for posterity and recording purposes, could you post the performance comparison between the approach used here (of merging together all the probabilities into one array for |
|
Test build #84357 has finished for PR 19715 at commit
|
MLnick
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Made a pass and comment on fit. I will keep reviewing the test cases and revert with any further comments.
| * `QuantileDiscretizer ` can map multiple columns at once by setting the `inputCols` parameter. | ||
| * Note that when both the `inputCol` and `inputCols` parameters are set, a log warning will be | ||
| * printed and only `inputCol` will take effect, while `inputCols` will be ignored. To specify | ||
| * the number of bucketsfor each column , the `numBucketsArray ` parameter can be set, or if the |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"bucketsfor" -> "buckets for"
and remove the leading space from " number of buckets ..." on next line
| val (inputColNames, outputColNames) = getInOutCols | ||
| val existingFields = schema.fields | ||
| var outputFields = existingFields | ||
| inputColNames.zip(outputColNames).map { case (inputColName, outputColName) => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
map can be foreach because there's no return value
| transformSchema(dataset.schema, logging = true) | ||
| val splits = dataset.stat.approxQuantile($(inputCol), | ||
| (0.0 to 1.0 by 1.0/$(numBuckets)).toArray, $(relativeError)) | ||
| val bucketizer = new Bucketizer(uid).setHandleInvalid($(handleInvalid)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looking at this now, the Array.fill approach probably adds needless complexity.
But the multi-buckets case can perhaps still be cleaned up. How about something like this:
override def fit(dataset: Dataset[_]): Bucketizer = {
transformSchema(dataset.schema, logging = true)
val bucketizer = new Bucketizer(uid).setHandleInvalid($(handleInvalid))
if (isQuantileDiscretizeMultipleColumns) {
val splitsArray = if (isSet(numBucketsArray)) {
val probArrayPerCol = $(numBucketsArray).map { numOfBuckets =>
(0.0 to 1.0 by 1.0 / numOfBuckets).toArray
}
val probabilityArray = probArrayPerCol.flatten.sorted.distinct
val splitsArrayRaw = dataset.stat.approxQuantile($(inputCols),
probabilityArray, $(relativeError))
splitsArrayRaw.zip(probArrayPerCol).map { case (splits, probs) =>
val probSet = probs.toSet
val idxSet = probabilityArray.zipWithIndex.collect {
case (p, idx) if probSet(p) =>
idx
}.toSet
splits.zipWithIndex.collect {
case (s, idx) if idxSet(idx) =>
s
}
}
} else {
dataset.stat.approxQuantile($(inputCols),
(0.0 to 1.0 by 1.0 / $(numBuckets)).toArray, $(relativeError))
}
bucketizer.setSplitsArray(splitsArray.map(getDistinctSplits))
} else {
val splits = dataset.stat.approxQuantile($(inputCol),
(0.0 to 1.0 by 1.0 / $(numBuckets)).toArray, $(relativeError))
bucketizer.setSplits(getDistinctSplits(splits))
}
copyValues(bucketizer.setParent(this))
}Then we don't need getSplitsForEachColumn method (or part of the above could be factored out into a private method if it makes sense).
|
@MLnick Thank you very much for your comments! I will change these. |
|
Test build #84674 has finished for PR 19715 at commit
|
|
|
||
| private[feature] def isQuantileDiscretizeMultipleColumns(): Boolean = { | ||
| if (isSet(inputCols) && isSet(inputCol)) { | ||
| logWarning("Both `inputCol` and `inputCols` are set, we ignore `inputCols` and this " + |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
According to the discussion result at JIRA SPARK-8418, we should throw exception when both inputCol and inputCols are specified ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@WeichenXu123 I will change to throw Exception. Thanks.
|
Test build #84753 has finished for PR 19715 at commit
|
| if (!isQuantileDiscretizeMultipleColumns) { | ||
| require((isSet(inputCol) && isSet(outputCol) && !isSet(inputCols) && !isSet(outputCols)) || | ||
| (!isSet(inputCol) && !isSet(outputCol) && isSet(inputCols) && isSet(outputCols)), | ||
| "Only allow to set either inputCol/outputCol, or inputCols/outputCols" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think a better message is something like "QuantileDiscretizer only supports setting either ..."
| * number of buckets should be the same across columns, `numBuckets` can be set as a convenience. | ||
| * `QuantileDiscretizer` can map multiple columns at once by setting the `inputCols` parameter. | ||
| * Note that only one of `inputCol` and `inputCols` parameters can be set. If both of the | ||
| * `inputCol` and `inputCols` parameters are set, an Exception will be thrown. To specify the |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Think we can simplify to "If both inputCol and inputCols are set, ..." (since we already said in the previous sentence that only one of the parameters can be set)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@MLnick Thank you very much for your comments. I will change these.
|
Test build #84780 has finished for PR 19715 at commit
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, made another pass. Overall tests look good. Made a few small comments.
As I think @jkbradley mentioned elsewhere, we should just check that we don't break save/load back compat (it should be ok but let's confirm).
So, if we create and save a QD before this PR and load it after this PR, it will still work fine.
| val dataFrame: DataFrame = validData1.zip(validData2).zip(v).zip(w).map { | ||
| case (((a, b), c), d) => (a, b, c, d) | ||
| }.toSeq.toDF("input1", "input2", "expected1", "expected2") | ||
| dataFrame.show |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove the show call here
| }.toSeq.toDF("input1", "input2", "expected1", "expected2") | ||
| dataFrame.show | ||
| val result = discretizer.fit(dataFrame).transform(dataFrame) | ||
| result.show |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
here too
| import spark.implicits._ | ||
|
|
||
| val datasetSize = 20 | ||
| val numBucketsArray: Array[Int] = Array(2, 5, 10) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is unused?
| val data1 = Array.range(1, 21, 1).map(_.toDouble) | ||
| val data2 = Array.range(1, 40, 2).map(_.toDouble) | ||
| val data3 = Array.range(1, 60, 3).map(_.toDouble) | ||
| val data = (0 until 20).map { idx => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can use datasetSize here? Or remove datasetSize as it's unused.
| val df = sc.parallelize(Array(1.0, 2.0, 3.0, 4.0, 5.0, 6.0)) | ||
| .map(Tuple1.apply).toDF("input") | ||
| // When both inputCol and inputCols are set, we throw Exception. | ||
| intercept[Exception] { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe intercept IllegalArgumentException to be more specific.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@MLnick Thanks a lot for your comments. I will change these.
| if (isSet(inputCol)) { | ||
| (Array($(inputCol)), Array($(outputCol))) | ||
| } else { | ||
| require($(inputCols).length == $(outputCols).length, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should add a small test case for mismatched sizes of inputCols / outputCols.
|
I have also verified the save/load back compatibility. |
|
Test build #84974 has finished for PR 19715 at commit
|
|
retest this please |
|
Test build #85199 has finished for PR 19715 at commit
|
| discretizer.fit(df) | ||
| test("multiple columns: Both inputCol and inputCols are set") { | ||
| intercept[IllegalArgumentException] { | ||
| new QuantileDiscretizer().setInputCol("in").setInputCols(Array("in1", "in2")).getInOutCols |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think I slightly prefer to actually test that the error is thrown during transform
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for spending so much time to review this PR. I will change this.
|
Test build #85275 has finished for PR 19715 at commit
|
|
Thanks for the changes @huaxingao. This LGTM now - any further comments from others? |
|
Jenkins retest this please |
|
Test build #85524 has finished for PR 19715 at commit
|
|
Merged to master. If there are any further small comments / clean ups we can do that during QA for 2.3 Thanks @huaxingao and all others for review! |
|
Thank you all for your help!! |
What changes were proposed in this pull request?
add multi columns support to QuantileDiscretizer.
When calculating the splits, we can either merge together all the probabilities into one array by calculating approxQuantiles on multiple columns at once, or compute approxQuantiles separately for each column. After doing the performance comparision, we found it’s better to calculating approxQuantiles on multiple columns at once.
Here is how we measuring the performance time:
Here is the performance test result:
How was this patch tested?
add UT in QuantileDiscretizerSuite to test multi columns supports