Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions docs/structured-streaming-programming-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -2812,6 +2812,16 @@ See [Input Sources](#input-sources) and [Output Sinks](#output-sinks) sections f

# Additional Information

**Notes**
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was thinking adding this information somewhere API or configuration only. For instance, notes like #19617.

lots of wondering around SO and user mailing list,

I don't object to note these stuff but usually the site has only key points for some features or configurations.

If there are more instance to describe specifically for structured streaming (where the same SQL configurations could lead to some confusions), I am fine with adding this. If not or less sure for now, I would add them into API's doc or configuration's doc.


- Several configurations are not modifiable after the query has run. To change them, discard the checkpoint and start a new query. These configurations include:
- `spark.sql.shuffle.partitions`
- This is due to the physical partitioning of state: state is partitioned via applying hash function to key, hence the number of partitions for state should be unchanged.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure we need sub- and sub-sub-lists here, but I don't mind much. Do end users need to know why? those details are more often relevant in source code docs

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I'm one of end users (and I am) I'd wonder why I'm restricted to change these parameters. There're fairly many developers who don't blindly get anything and want to understand. I'm wondering these details are OK to be included to guide doc, but we also don't have docs for explaining the details, or FAQ/troubleshooting, so not sure where to put.

If we are OK with adding these details to the doc of its config in source code (like below) I'm OK with it. Otherwise I'm not sure they can search and find the description in source code.

val SHUFFLE_PARTITIONS = buildConf("spark.sql.shuffle.partitions")
.doc("The default number of partitions to use when shuffling data for joins or aggregations.")
.intConf
.createWithDefault(200)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(Chiming in to say that I've definitely seen end users get confused and want to know why this restriction is so.)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good to me, leave it in

- If you want to run fewer tasks for stateful operations, `coalesce` would help with avoiding unnecessary repartitioning.
- After `coalesce`, the number of (reduced) tasks will be kept unless another shuffle happens.
- `spark.sql.streaming.stateStore.providerClass`: To read the previous state of the query properly, the class of state store provider should be unchanged.
- `spark.sql.streaming.multipleWatermarkPolicy`: Modification of this would lead inconsistent watermark value when query contains multiple watermarks, hence the policy should be unchanged.

**Further Reading**

- See and run the
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -266,7 +266,9 @@ object SQLConf {
.createWithDefault(Long.MaxValue)

val SHUFFLE_PARTITIONS = buildConf("spark.sql.shuffle.partitions")
.doc("The default number of partitions to use when shuffling data for joins or aggregations.")
.doc("The default number of partitions to use when shuffling data for joins or aggregations. " +
"Note: For structured streaming, this configuration cannot be changed between query " +
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

s/cannot be/must not be/

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The sentence is borrowed from existing one:

"Note: This configuration cannot be changed between query restarts from the same " +
"checkpoint location.")

"restarts from the same checkpoint location.")
.intConf
.createWithDefault(200)

Expand Down Expand Up @@ -868,7 +870,9 @@ object SQLConf {
.internal()
.doc(
"The class used to manage state data in stateful streaming queries. This class must " +
"be a subclass of StateStoreProvider, and must have a zero-arg constructor.")
"be a subclass of StateStoreProvider, and must have a zero-arg constructor. " +
"Note: For structured streaming, this configuration cannot be changed between query " +
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

s/cannot be/must not be/

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here.

"restarts from the same checkpoint location.")
.stringConf
.createWithDefault(
"org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider")
Expand Down