-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-25245][DOCS][SS] Explain regarding limiting modification on "spark.sql.shuffle.partitions" for structured streaming #22238
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
…ffle.partitions" for structured streaming
|
Test build #95268 has finished for PR 22238 at commit
|
|
|
||
| # Additional Information | ||
|
|
||
| **Gotchas** |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hmmmmm .. @HeartSaVioR how about leaving them in codes or API somewhere as note?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IMO, It would be better to keep it here as well as in the code, we may not be able to surface it in the right api docs and chance for users to miss it.
@HeartSaVioR, may be add an example here to illustrate how to use the coalesce?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was going to add the explanation to doc() of spark.sql.shuffle.partitions, but looks like what we explained in doc() would not be published automatically. (Please correct me if I'm missing here.) SQLConf is even not exposed to scaladoc. That's why I'm adding this to structured streaming guide doc. Actually I think most of end users only take a look at this doc for structured streaming, and we can't (and shouldn't) expect end users to take a look at source code to find it.
But also actually I didn't notice that spark.sql.shuffle.partitions is explained in sql-programming-guide.md but I also think we need to explain all configs here if they work differently with batch query. spark.sql.shuffle.partitions is the case.
Btw, Gotchas looks like funny though. Maybe having section would be better. Maybe like ## Configuration Options For Structured Streaming in sql-programming-guide.md?
|
|
||
| - For structured streaming, modifying "spark.sql.shuffle.partitions" is restricted once you run the query. | ||
| - This is because state is partitioned via key, hence number of partitions for state should be unchanged. | ||
| - If you want to run less tasks for stateful operations, `coalesce` would help with avoiding unnecessary repartitioning. Please note that it will also affect downstream operators. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
An example of how to use coalesce operator with stateful streaming query would be superb.
I'd also appreciate if you added what type of downstream operators are affected and how.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It just means that the number of partitions in stateful operations' output will be same as parameter for coalesce, and the number of partitions will be kept unless another shuffle happens. It is implicitly same as spark.sql.shuffle.partitions without coalesce, which default value is 200.
I'll add the code, but not sure we need to have the code per language like Scala / Java / Python tabs since they will be same.
|
Also adding @tdas @zsxwing @jose-torres to cc. |
|
|
||
| This section is for configurations which are only available for structured streaming, or they behave differently with batch query. | ||
|
|
||
| - spark.sql.shuffle.partitions |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We do have it in sql-programming-guide.md. Shall we add some info there for now?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IMHO, if something goes wrong with structured streaming, end users would try to review structured streaming guide doc, rather than sql programming guide doc. Could we wait for hearing more voices on this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What I am worried is about adding a new section, which is quite unusual. Usually we go for it when multiple instances are detected later.
Are there more instance to describe here specifically for structured streaming?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I can revert adding a new section if you meant adding ## on it. While gotcha looks more like funny, I will change it to **Notes**.
The rationalization on adding this to doc is, this restriction had been making lots of wondering around SO and user mailing list, as well as even a patch for fixing this. So all of end users who use structured streaming would be nice to see it at least once, even they skim the doc, so that they can remember and revisit the doc once they get stuck on this.
|
Test build #95321 has finished for PR 22238 at commit
|
|
|
||
| # Additional Information | ||
|
|
||
| **Notes** |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was thinking adding this information somewhere API or configuration only. For instance, notes like #19617.
lots of wondering around SO and user mailing list,
I don't object to note these stuff but usually the site has only key points for some features or configurations.
If there are more instance to describe specifically for structured streaming (where the same SQL configurations could lead to some confusions), I am fine with adding this. If not or less sure for now, I would add them into API's doc or configuration's doc.
| - If you want to run less tasks for stateful operations, `coalesce` would help with avoiding unnecessary repartitioning. | ||
| - e.g. `df.groupBy("time").count().coalesce(10)` reduces the number of tasks by 10, whereas `spark.sql.shuffle.partitions` may be bigger. | ||
| - After `coalesce`, the number of (reduced) tasks will be kept unless another shuffle happens. | ||
| - `spark.sql.streaming.stateStore.providerClass` |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, okay, so there are more instances to describe here. If so, im okay.
|
Test build #95326 has finished for PR 22238 at commit
|
| val SHUFFLE_PARTITIONS = buildConf("spark.sql.shuffle.partitions") | ||
| .doc("The default number of partitions to use when shuffling data for joins or aggregations.") | ||
| .doc("The default number of partitions to use when shuffling data for joins or aggregations. " + | ||
| "Note: For structured streaming, this configuration cannot be changed between query " + |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
s/cannot be/must not be/
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The sentence is borrowed from existing one:
spark/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
Lines 978 to 979 in 8198ea5
| "Note: This configuration cannot be changed between query restarts from the same " + | |
| "checkpoint location.") |
| "The class used to manage state data in stateful streaming queries. This class must " + | ||
| "be a subclass of StateStoreProvider, and must have a zero-arg constructor.") | ||
| "be a subclass of StateStoreProvider, and must have a zero-arg constructor. " + | ||
| "Note: For structured streaming, this configuration cannot be changed between query " + |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
s/cannot be/must not be/
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same here.
|
Could we revisit on this? This is simple doc change helping end users to troubleshoot which could be confused without knowing the details. |
srowen
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not as sure about the docs additions, but the additional info in param descriptions seems handy.
|
|
||
| **Notes** | ||
|
|
||
| - There're couple of configurations which are not modifiable once you run the query. If you really want to make changes for these configurations, you have to discard checkpoint and start a new query. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There're -> "There are a". This should probably be reworded though; doesn't need the 2nd person. "Several configurations are not modifiable after the query has run. To change them, discard the checkpoint and start a new query. These configurations include:"
I'm not sure how much we need to explain why they're not modifiable, but suggesting alternatives is good. I think the example is redundant below.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reworded expression looks shorter and better. Will address.
I'm not sure how much we need to explain why they're not modifiable
Yeah, I agree this is something everyone has different view. Someone could think the page is a guide so don't need to include implementation details. Other one (including me) could think the information is described nowhere so need to be included altogether, especially this is the major difference between batch query and streaming query.
No strong opinion (verbose vs less wondering on end users) so I can follow up the decision. If you think this is redundant please let me know so that I can address.
I think the example is redundant below.
Will address.
| - e.g. `df.groupBy("time").count().coalesce(10)` reduces the number of tasks by 10, whereas `spark.sql.shuffle.partitions` may be bigger. | ||
| - After `coalesce`, the number of (reduced) tasks will be kept unless another shuffle happens. | ||
| - `spark.sql.streaming.stateStore.providerClass` | ||
| - To read previous state of the query properly, the class of state store provider should be unchanged. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To read the previous state, etc. These also don't need to be sub bullet points?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will concat two line via : and add the.
| - There're couple of configurations which are not modifiable once you run the query. If you really want to make changes for these configurations, you have to discard checkpoint and start a new query. | ||
| - `spark.sql.shuffle.partitions` | ||
| - This is due to the physical partitioning of state: state is partitioned via applying hash function to key, hence the number of partitions for state should be unchanged. | ||
| - If you want to run less tasks for stateful operations, `coalesce` would help with avoiding unnecessary repartitioning. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
less -> fewer
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice finding. Will address.
|
Test build #100212 has finished for PR 22238 at commit
|
|
|
||
| - Several configurations are not modifiable after the query has run. To change them, discard the checkpoint and start a new query. These configurations include: | ||
| - `spark.sql.shuffle.partitions` | ||
| - This is due to the physical partitioning of state: state is partitioned via applying hash function to key, hence the number of partitions for state should be unchanged. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure we need sub- and sub-sub-lists here, but I don't mind much. Do end users need to know why? those details are more often relevant in source code docs
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If I'm one of end users (and I am) I'd wonder why I'm restricted to change these parameters. There're fairly many developers who don't blindly get anything and want to understand. I'm wondering these details are OK to be included to guide doc, but we also don't have docs for explaining the details, or FAQ/troubleshooting, so not sure where to put.
If we are OK with adding these details to the doc of its config in source code (like below) I'm OK with it. Otherwise I'm not sure they can search and find the description in source code.
spark/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
Lines 265 to 268 in 114d0de
| val SHUFFLE_PARTITIONS = buildConf("spark.sql.shuffle.partitions") | |
| .doc("The default number of partitions to use when shuffling data for joins or aggregations.") | |
| .intConf | |
| .createWithDefault(200) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(Chiming in to say that I've definitely seen end users get confused and want to know why this restriction is so.)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sounds good to me, leave it in
|
Merged to master |
|
Thanks all for reviewing and merging! |
…park.sql.shuffle.partitions" for structured streaming ## What changes were proposed in this pull request? This patch adds explanation of `why "spark.sql.shuffle.partitions" keeps unchanged in structured streaming`, which couple of users already wondered and some of them even thought it as a bug. This patch would help other end users to know about such behavior before they find by theirselves and being wondered. ## How was this patch tested? No need to test because this is a simple addition on guide doc with markdown editor. Closes apache#22238 from HeartSaVioR/SPARK-25245. Lead-authored-by: Jungtaek Lim <[email protected]> Co-authored-by: Jungtaek Lim (HeartSaVioR) <[email protected]> Signed-off-by: Sean Owen <[email protected]>
…park.sql.shuffle.partitions" for structured streaming ## What changes were proposed in this pull request? This patch adds explanation of `why "spark.sql.shuffle.partitions" keeps unchanged in structured streaming`, which couple of users already wondered and some of them even thought it as a bug. This patch would help other end users to know about such behavior before they find by theirselves and being wondered. ## How was this patch tested? No need to test because this is a simple addition on guide doc with markdown editor. Closes apache#22238 from HeartSaVioR/SPARK-25245. Lead-authored-by: Jungtaek Lim <[email protected]> Co-authored-by: Jungtaek Lim (HeartSaVioR) <[email protected]> Signed-off-by: Sean Owen <[email protected]>
What changes were proposed in this pull request?
This patch adds explanation of
why "spark.sql.shuffle.partitions" keeps unchanged in structured streaming, which couple of users already wondered and some of them even thought it as a bug.This patch would help other end users to know about such behavior before they find by theirselves and being wondered.
How was this patch tested?
No need to test because this is a simple addition on guide doc with markdown editor.