Skip to content
Closed
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions docs/structured-streaming-programming-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -2812,6 +2812,19 @@ See [Input Sources](#input-sources) and [Output Sinks](#output-sinks) sections f

# Additional Information

**Notes**
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was thinking adding this information somewhere API or configuration only. For instance, notes like #19617.

lots of wondering around SO and user mailing list,

I don't object to note these stuff but usually the site has only key points for some features or configurations.

If there are more instance to describe specifically for structured streaming (where the same SQL configurations could lead to some confusions), I am fine with adding this. If not or less sure for now, I would add them into API's doc or configuration's doc.


- There're couple of configurations which are not modifiable once you run the query. If you really want to make changes for these configurations, you have to discard checkpoint and start a new query.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There're -> "There are a". This should probably be reworded though; doesn't need the 2nd person. "Several configurations are not modifiable after the query has run. To change them, discard the checkpoint and start a new query. These configurations include:"

I'm not sure how much we need to explain why they're not modifiable, but suggesting alternatives is good. I think the example is redundant below.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reworded expression looks shorter and better. Will address.

I'm not sure how much we need to explain why they're not modifiable

Yeah, I agree this is something everyone has different view. Someone could think the page is a guide so don't need to include implementation details. Other one (including me) could think the information is described nowhere so need to be included altogether, especially this is the major difference between batch query and streaming query.

No strong opinion (verbose vs less wondering on end users) so I can follow up the decision. If you think this is redundant please let me know so that I can address.

I think the example is redundant below.

Will address.

- `spark.sql.shuffle.partitions`
- This is due to the physical partitioning of state: state is partitioned via applying hash function to key, hence the number of partitions for state should be unchanged.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure we need sub- and sub-sub-lists here, but I don't mind much. Do end users need to know why? those details are more often relevant in source code docs

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I'm one of end users (and I am) I'd wonder why I'm restricted to change these parameters. There're fairly many developers who don't blindly get anything and want to understand. I'm wondering these details are OK to be included to guide doc, but we also don't have docs for explaining the details, or FAQ/troubleshooting, so not sure where to put.

If we are OK with adding these details to the doc of its config in source code (like below) I'm OK with it. Otherwise I'm not sure they can search and find the description in source code.

val SHUFFLE_PARTITIONS = buildConf("spark.sql.shuffle.partitions")
.doc("The default number of partitions to use when shuffling data for joins or aggregations.")
.intConf
.createWithDefault(200)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(Chiming in to say that I've definitely seen end users get confused and want to know why this restriction is so.)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good to me, leave it in

- If you want to run less tasks for stateful operations, `coalesce` would help with avoiding unnecessary repartitioning.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

less -> fewer

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice finding. Will address.

- e.g. `df.groupBy("time").count().coalesce(10)` reduces the number of tasks by 10, whereas `spark.sql.shuffle.partitions` may be bigger.
- After `coalesce`, the number of (reduced) tasks will be kept unless another shuffle happens.
- `spark.sql.streaming.stateStore.providerClass`
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, okay, so there are more instances to describe here. If so, im okay.

- To read previous state of the query properly, the class of state store provider should be unchanged.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To read the previous state, etc. These also don't need to be sub bullet points?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will concat two line via : and add the.

- `spark.sql.streaming.multipleWatermarkPolicy`
- Modification of this would lead inconsistent watermark value when query contains multiple watermarks, hence the policy should be unchanged.

**Further Reading**

- See and run the
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -266,7 +266,9 @@ object SQLConf {
.createWithDefault(Long.MaxValue)

val SHUFFLE_PARTITIONS = buildConf("spark.sql.shuffle.partitions")
.doc("The default number of partitions to use when shuffling data for joins or aggregations.")
.doc("The default number of partitions to use when shuffling data for joins or aggregations. " +
"Note: For structured streaming, this configuration cannot be changed between query " +
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

s/cannot be/must not be/

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The sentence is borrowed from existing one:

"Note: This configuration cannot be changed between query restarts from the same " +
"checkpoint location.")

"restarts from the same checkpoint location.")
.intConf
.createWithDefault(200)

Expand Down Expand Up @@ -868,7 +870,9 @@ object SQLConf {
.internal()
.doc(
"The class used to manage state data in stateful streaming queries. This class must " +
"be a subclass of StateStoreProvider, and must have a zero-arg constructor.")
"be a subclass of StateStoreProvider, and must have a zero-arg constructor. " +
"Note: For structured streaming, this configuration cannot be changed between query " +
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

s/cannot be/must not be/

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here.

"restarts from the same checkpoint location.")
.stringConf
.createWithDefault(
"org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider")
Expand Down