-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-40883][CONNECT] Support Range in Connect proto #38347
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
R: @cloud-fan |
|
Can one of the admins verify this patch? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
end is not optional, but how do we know if the client forgets to set it? 0 is a valid end value as well.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah this becomes tricky. Ultimately we can wrap every such field into a message so we always know if that field is set or not set. However that might complicate entire proto too much.. Let's have a discussion on that.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we can call session.leafNodeDefaultParallelism
...nect/src/test/scala/org/apache/spark/sql/connect/planner/SparkConnectSessionBasedSuite.scala
Outdated
Show resolved
Hide resolved
|
PR should be ready for review again. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
how about we call spark.range(10).toDF then we don't need to add comparePlansDatasetLong?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let me try to see if it gives an exact plan.
Another idea might be we just compare the result through collect() so we do not compare the plan on this case.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh the .toDF() just convert things into DataFrame.
It has removed the comparePlansDatasetLong
|
Conflict resolved |
| end: Int, | ||
| step: Option[Int], | ||
| numPartitions: Option[Int]): Relation = { | ||
| val range = proto.Range.newBuilder() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note that I need to keep proto.Range as Range itself is a built-in scala class so we need proto. to differentiate for this special case.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've been explicitly requesting this a couple of times already, as a coding style to always prefix the proto generated classes with their proto. prefix. I know it uses a little bit more horizontal space, but at the same time it makes always clear where this particular element comes from which is tremendously useful because we're consistently using the different types between the catalyst API and Spark Connect in the same code paths.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It makes sense for SparkConnectPlanner where Catalyst and Proto are both mixed together, and we are keeping the approach you are asking there.
However this is the Connect DSL that only deal with protos. No Catalyst included in this package:
spark/connector/connect/src/main/scala/org/apache/spark/sql/connect/dsl/package.scala
Line 17 in 9fc3aa0
| package org.apache.spark.sql.connect |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As long as no catalyst is in this package this is good with me. Thanks for clarifying.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this really the best way to express the optionality?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There are two dimensions of things in this area:
-
Required versus Optional.
A field is required, meaning it must be set. A field can be optional. Meaning it could be set or not. -
Field has default value or not.
A field can have a default value if not set.
The second point is an addition for the first point. If there is a field which is not set, there could be a default value to be used.
There are special cases that the default value for proto, is the same as the default value that Spark uses. In that case we don't need to differentiate the optionality. Otherwise we need this way to differentiate set versus not set, to adopt default values of Spark (unless we don't care the default values in Spark).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To really answer your question: if we plan to respect default values for Spark for those optionally fields whose default proto values are different from Spark default values, this is the only way to respect default values for Spark.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So in fewer words :) when num_partitions is an integer the default value is 0 even if it's not and for scalar types we can't differentiate between present or not. Understanding if 0 is a valid or invalid value defeats the purpose.
Thanks for the additional color!
| end: Int, | ||
| step: Option[Int], | ||
| numPartitions: Option[Int]): Relation = { | ||
| val range = proto.Range.newBuilder() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've been explicitly requesting this a couple of times already, as a coding style to always prefix the proto generated classes with their proto. prefix. I know it uses a little bit more horizontal space, but at the same time it makes always clear where this particular element comes from which is tremendously useful because we're consistently using the different types between the catalyst API and Spark Connect in the same code paths.
|
thanks, merging to master! |
| int32 start = 1; | ||
| int32 end = 2; | ||
| // Optional. Default value = 1 | ||
| Step step = 3; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
start, end, step should use int64 @amaliujia
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes let me follow up. I guess I was looking at python side API somehow thus confused myself on the types.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updating in #38460.
### What changes were proposed in this pull request? 1. Support `Range` in Connect proto. 2. Refactor `SparkConnectDeduplicateSuite` to `SparkConnectSessionBasedSuite` ### Why are the changes needed? Improve API coverage. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? UT Closes apache#38347 from amaliujia/add_range. Authored-by: Rui Wang <[email protected]> Signed-off-by: Wenchen Fan <[email protected]>
What changes were proposed in this pull request?
Rangein Connect proto.SparkConnectDeduplicateSuitetoSparkConnectSessionBasedSuiteWhy are the changes needed?
Improve API coverage.
Does this PR introduce any user-facing change?
No
How was this patch tested?
UT