-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-22346][ML] VectorSizeHint Transformer for using VectorAssembler in StructuredSteaming #19746
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
Test build #83850 has finished for PR 19746 at commit
|
|
Test build #83859 has finished for PR 19746 at commit
|
|
Test build #83860 has finished for PR 19746 at commit
|
|
Test build #83869 has finished for PR 19746 at commit
|
|
Test build #83909 has finished for PR 19746 at commit
|
38e1c5c to
03bd63c
Compare
|
Test build #83918 has finished for PR 19746 at commit
|
WeichenXu123
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I leave some comments. Thanks!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think here can simply use:
val checkVectorSizeUDF = udf { vector: Vector => ...}
checkVectorSizeUDF(col(localInputCol))
So code will be clearer.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The UDF which is possible to throw exception should be marked as nondeterministic, check this PR #19662 for more explanation.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think here use res.na.drop(Array(localInputCol)) will be better.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need define a new exception class ? Or directly use SparkException ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use intercept[SparkException] {...} is better.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've made the change. Just out of curiosity, why is intercept better than assertThrows?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't find a test for optimistic option. We should test:
If input dataset vector column do not include metadata, the VectorSizeHint should add metadata with proper size, or input vector column include metadata with different size, the VectorSizeHint should replace it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I talked offline to @jkbradley and I think it's better to throw an exception unless if the column includes metadata & the there is a mismatch between the new and original size.
I've added a new test for this exception and made sure the other tests are run with all handleInvalid cases. Does it look ok now?
|
Test build #84036 has finished for PR 19746 at commit
|
|
Test build #84039 has finished for PR 19746 at commit
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add checking for added metadata here ?
And should test if metadata exists, but size do not match, exception will be thrown.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can I just remove this test? I feel like all of that is tested in the first 3 tests.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK. I agree. Other testcases already cover them.
|
Test build #84046 has finished for PR 19746 at commit
|
|
jenkins retest this please |
|
Test build #84089 has finished for PR 19746 at commit
|
|
jenkins retest this please |
|
Test build #84122 has finished for PR 19746 at commit
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
case (data, transform) ==> case (data, transformer)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use CheckAnswer(expected, expected) will be simpler.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The reason I didn't use CheckAnswer is because there isn't an implicit encoder in testImplicits that handles Vector. I tried CheckAnswer[Vector](expected, expected) but that doesn't work either :(. Is there an encoder that works for Vectors?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ah, sorry, it should be CheckAnswer(Tuple1(expected), Tuple1(expected)). It should work I think.
|
What about supporting multiple columns ? VectorAssembler require multiple input columns, they all need VectorSizeHint to transform first. There's no need to use multiple VectorSizeHint transformer. |
|
@WeichenXu123 From what I've seen, it's more common for people to use VectorAssembler to assemble a bunch of Numeric columns, rather than a bunch of Vector columns. I'd recommend we do things incrementally, adding single-column support before multi-column support (especially since we're still trying to achieve consensus about design for multi-column support, per my recent comment in the umbrella JIRA). |
|
Test build #84449 has finished for PR 19746 at commit
|
|
Test build #84451 has finished for PR 19746 at commit
|
|
reviewing now |
jkbradley
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the PR! I had a number of comments, but they are mostly small ones.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add :: Experimental :: note here so it shows up properly in docs. Look at other uses of Experimental for examples. (Same for the companion object)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also, it'd be good to add more docs about why/when people should use this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
style: always specify type explicitly (There was some better reason for this which I forget...)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add a docstring and mark with @group param
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Mark with @group getParam
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The writing here is formatted strangely. How about:
"How to handle invalid vectors in inputCol. Invalid vectors include nulls and vectors with the wrong size. The options are skip (filter out rows with invalid vectors), error (throw an error) and keep (do not check the vector size, and keep all rows)."
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
typo: mismatch
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
style nit: Call collect() with parentheses
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Test keep/optimistic too
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Did you a thought on how to test keep/optimistic. I could verify that the invalid data is not removed but that's a little bit weird to test. It's ensuring that this option allows the column to get into a "bad state" where the metadata doesn't match the contents. Is that what you had in mind?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yep, that's what I had in mind. That is the expected behavior, so we can test that behavior...even if it's not what most use cases would need.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
steaming streaming
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can just put these in a PipelineModel to avoid using foldLeft.
c3d1c5e to
cafa875
Compare
jkbradley
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just a few comments based on the updates
| override def copy(extra: ParamMap): this.type = defaultCopy(extra) | ||
| } | ||
|
|
||
| @Experimental |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add Scala docstring here with :: Experimental :: note.
| /** | ||
| * Param for how to handle invalid entries. Invalid vectors include nulls and vectors with the | ||
| * wrong size. The options are `skip` (filter out rows with invalid vectors), `error` (throw an | ||
| * error) and `keep` (do not check the vector size, and keep all rows). `error` by default. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
keep -> optimistic
| "handleInvalid", | ||
| "How to handle invalid vectors in inputCol. Invalid vectors include nulls and vectors with " + | ||
| "the wrong size. The options are skip (filter out rows with invalid vectors), error " + | ||
| "(throw an error) and keep (do not check the vector size, and keep all rows). `error` by " + |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
keep -> optimistic
| .setInputCols(Array("a", "b")) | ||
| .setOutputCol("assembled") | ||
| val pipeline = new Pipeline().setStages(Array(sizeHintA, sizeHintB, vectorAssembler)) | ||
| /** |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove unused code?
|
Test build #84880 has finished for PR 19746 at commit
|
|
Test build #85071 has finished for PR 19746 at commit
|
WeichenXu123
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Only one minor issue, otherwise LGTM.
| /** | ||
| * Param for how to handle invalid entries. Invalid vectors include nulls and vectors with the | ||
| * wrong size. The options are `skip` (filter out rows with invalid vectors), `error` (throw an | ||
| * error) and `optimistic` (do not check the vector size, and keep all row\). `error` by default. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"row\" ==> "rows"
|
Test build #85134 has finished for PR 19746 at commit
|
|
LGTM |
What changes were proposed in this pull request?
A new VectorSizeHint transformer was added. This transformer is meant to be used as a pipeline stage ahead of VectorAssembler, on vector columns, so that VectorAssembler can join vectors in a streaming context where the size of the input vectors is otherwise not known.
How was this patch tested?
Unit tests.
Please review http://spark.apache.org/contributing.html before opening a pull request.