From ec24f2941021d34058d7af3d3f103094582fd79d Mon Sep 17 00:00:00 2001 From: Jungtaek Lim Date: Mon, 27 Aug 2018 06:08:04 +0900 Subject: [PATCH 1/4] SPARK-25245 Explain regarding limiting modification on "spark.sql.shuffle.partitions" for structured streaming --- docs/structured-streaming-programming-guide.md | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/docs/structured-streaming-programming-guide.md b/docs/structured-streaming-programming-guide.md index 73de1892977a..921051f8ae5f 100644 --- a/docs/structured-streaming-programming-guide.md +++ b/docs/structured-streaming-programming-guide.md @@ -2812,6 +2812,12 @@ See [Input Sources](#input-sources) and [Output Sinks](#output-sinks) sections f # Additional Information +**Gotchas** + +- For structured streaming, modifying "spark.sql.shuffle.partitions" is restricted once you run the query. + - This is because state is partitioned via key, hence number of partitions for state should be unchanged. + - If you want to run less tasks for stateful operations, `coalesce` would help with avoiding unnecessary repartitioning. Please note that it will also affect downstream operators. + **Further Reading** - See and run the From 138cc63e639b60fb7e803097654816ad6c19c95f Mon Sep 17 00:00:00 2001 From: Jungtaek Lim Date: Tue, 28 Aug 2018 08:44:53 +0900 Subject: [PATCH 2/4] Address review comments --- docs/structured-streaming-programming-guide.md | 15 ++++++++++----- .../org/apache/spark/sql/internal/SQLConf.scala | 5 ++++- 2 files changed, 14 insertions(+), 6 deletions(-) diff --git a/docs/structured-streaming-programming-guide.md b/docs/structured-streaming-programming-guide.md index 921051f8ae5f..616a897ee2d0 100644 --- a/docs/structured-streaming-programming-guide.md +++ b/docs/structured-streaming-programming-guide.md @@ -2812,13 +2812,18 @@ See [Input Sources](#input-sources) and [Output Sinks](#output-sinks) sections f # Additional Information -**Gotchas** +## Configuration Options For Structured Streaming -- For structured streaming, modifying "spark.sql.shuffle.partitions" is restricted once you run the query. - - This is because state is partitioned via key, hence number of partitions for state should be unchanged. - - If you want to run less tasks for stateful operations, `coalesce` would help with avoiding unnecessary repartitioning. Please note that it will also affect downstream operators. +This section is for configurations which are only available for structured streaming, or they behave differently with batch query. -**Further Reading** +- spark.sql.shuffle.partitions + - This configuration is not modifiable once you run the structured streaming query. + - This is due to the physical partitioning of state: state is partitioned via applying hash function to key, hence the number of partitions for state should be unchanged. + - If you want to run less tasks for stateful operations, `coalesce` would help with avoiding unnecessary repartitioning. + - e.g. `df.groupBy("time").count().coalesce(10)` reduces the number of tasks by 10, whereas `spark.sql.shuffle.partitions` may be bigger. + - After `coalesce`, the number of (reduced) tasks will be kept unless another shuffle happens. + +## Further Reading - See and run the [Scala]({{site.SPARK_GITHUB_URL}}/tree/v{{site.SPARK_VERSION_SHORT}}/examples/src/main/scala/org/apache/spark/examples/sql/streaming)/[Java]({{site.SPARK_GITHUB_URL}}/tree/v{{site.SPARK_VERSION_SHORT}}/examples/src/main/java/org/apache/spark/examples/sql/streaming)/[Python]({{site.SPARK_GITHUB_URL}}/tree/v{{site.SPARK_VERSION_SHORT}}/examples/src/main/python/sql/streaming)/[R]({{site.SPARK_GITHUB_URL}}/tree/v{{site.SPARK_VERSION_SHORT}}/examples/src/main/r/streaming) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index ef3ce98fd7ad..d925be77792c 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -266,7 +266,10 @@ object SQLConf { .createWithDefault(Long.MaxValue) val SHUFFLE_PARTITIONS = buildConf("spark.sql.shuffle.partitions") - .doc("The default number of partitions to use when shuffling data for joins or aggregations.") + .doc("The default number of partitions to use when shuffling data for joins or aggregations. " + + "In structured streaming, the value is not modifiable once you run the query. This is due " + + "to the physical partitioning of state: state is partitioned via applying hash function " + + "to key, hence the number of partitions for state should be unchanged.") .intConf .createWithDefault(200) From e2ee43da2f9bf4fb95c938764ee3584bbae06c1b Mon Sep 17 00:00:00 2001 From: Jungtaek Lim Date: Tue, 28 Aug 2018 13:22:29 +0900 Subject: [PATCH 3/4] Replace section with bold string, changed section's title to "note" --- .../structured-streaming-programming-guide.md | 26 ++++++++++--------- .../apache/spark/sql/internal/SQLConf.scala | 9 ++++--- 2 files changed, 19 insertions(+), 16 deletions(-) diff --git a/docs/structured-streaming-programming-guide.md b/docs/structured-streaming-programming-guide.md index 616a897ee2d0..1f17158e3978 100644 --- a/docs/structured-streaming-programming-guide.md +++ b/docs/structured-streaming-programming-guide.md @@ -2812,18 +2812,20 @@ See [Input Sources](#input-sources) and [Output Sinks](#output-sinks) sections f # Additional Information -## Configuration Options For Structured Streaming - -This section is for configurations which are only available for structured streaming, or they behave differently with batch query. - -- spark.sql.shuffle.partitions - - This configuration is not modifiable once you run the structured streaming query. - - This is due to the physical partitioning of state: state is partitioned via applying hash function to key, hence the number of partitions for state should be unchanged. - - If you want to run less tasks for stateful operations, `coalesce` would help with avoiding unnecessary repartitioning. - - e.g. `df.groupBy("time").count().coalesce(10)` reduces the number of tasks by 10, whereas `spark.sql.shuffle.partitions` may be bigger. - - After `coalesce`, the number of (reduced) tasks will be kept unless another shuffle happens. - -## Further Reading +**Notes** + +- There're couple of configurations which are not modifiable once you run the query. If you really want to make changes for these configurations, you have to discard checkpoint and start a new query. + - `spark.sql.shuffle.partitions` + - This is due to the physical partitioning of state: state is partitioned via applying hash function to key, hence the number of partitions for state should be unchanged. + - If you want to run less tasks for stateful operations, `coalesce` would help with avoiding unnecessary repartitioning. + - e.g. `df.groupBy("time").count().coalesce(10)` reduces the number of tasks by 10, whereas `spark.sql.shuffle.partitions` may be bigger. + - After `coalesce`, the number of (reduced) tasks will be kept unless another shuffle happens. + - `spark.sql.streaming.stateStore.providerClass` + - To read previous state of the query properly, the class of state store provider should be unchanged. + - `spark.sql.streaming.multipleWatermarkPolicy` + - Modification of this would lead inconsistent watermark value when query contains multiple watermarks, hence the policy should be unchanged. + +**Further Reading** - See and run the [Scala]({{site.SPARK_GITHUB_URL}}/tree/v{{site.SPARK_VERSION_SHORT}}/examples/src/main/scala/org/apache/spark/examples/sql/streaming)/[Java]({{site.SPARK_GITHUB_URL}}/tree/v{{site.SPARK_VERSION_SHORT}}/examples/src/main/java/org/apache/spark/examples/sql/streaming)/[Python]({{site.SPARK_GITHUB_URL}}/tree/v{{site.SPARK_VERSION_SHORT}}/examples/src/main/python/sql/streaming)/[R]({{site.SPARK_GITHUB_URL}}/tree/v{{site.SPARK_VERSION_SHORT}}/examples/src/main/r/streaming) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index d925be77792c..ca100da9f019 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -267,9 +267,8 @@ object SQLConf { val SHUFFLE_PARTITIONS = buildConf("spark.sql.shuffle.partitions") .doc("The default number of partitions to use when shuffling data for joins or aggregations. " + - "In structured streaming, the value is not modifiable once you run the query. This is due " + - "to the physical partitioning of state: state is partitioned via applying hash function " + - "to key, hence the number of partitions for state should be unchanged.") + "Note: For structured streaming, this configuration cannot be changed between query " + + "restarts from the same checkpoint location.") .intConf .createWithDefault(200) @@ -871,7 +870,9 @@ object SQLConf { .internal() .doc( "The class used to manage state data in stateful streaming queries. This class must " + - "be a subclass of StateStoreProvider, and must have a zero-arg constructor.") + "be a subclass of StateStoreProvider, and must have a zero-arg constructor. " + + "Note: For structured streaming, this configuration cannot be changed between query " + + "restarts from the same checkpoint location.") .stringConf .createWithDefault( "org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider") From bb45c2632f7beb6e0fba78cf2c55233c271eefd8 Mon Sep 17 00:00:00 2001 From: "Jungtaek Lim (HeartSaVioR)" Date: Mon, 17 Dec 2018 10:57:03 +0900 Subject: [PATCH 4/4] Address @srowen review comments --- docs/structured-streaming-programming-guide.md | 11 ++++------- 1 file changed, 4 insertions(+), 7 deletions(-) diff --git a/docs/structured-streaming-programming-guide.md b/docs/structured-streaming-programming-guide.md index 1f17158e3978..8c3622c85724 100644 --- a/docs/structured-streaming-programming-guide.md +++ b/docs/structured-streaming-programming-guide.md @@ -2814,16 +2814,13 @@ See [Input Sources](#input-sources) and [Output Sinks](#output-sinks) sections f **Notes** -- There're couple of configurations which are not modifiable once you run the query. If you really want to make changes for these configurations, you have to discard checkpoint and start a new query. +- Several configurations are not modifiable after the query has run. To change them, discard the checkpoint and start a new query. These configurations include: - `spark.sql.shuffle.partitions` - This is due to the physical partitioning of state: state is partitioned via applying hash function to key, hence the number of partitions for state should be unchanged. - - If you want to run less tasks for stateful operations, `coalesce` would help with avoiding unnecessary repartitioning. - - e.g. `df.groupBy("time").count().coalesce(10)` reduces the number of tasks by 10, whereas `spark.sql.shuffle.partitions` may be bigger. + - If you want to run fewer tasks for stateful operations, `coalesce` would help with avoiding unnecessary repartitioning. - After `coalesce`, the number of (reduced) tasks will be kept unless another shuffle happens. - - `spark.sql.streaming.stateStore.providerClass` - - To read previous state of the query properly, the class of state store provider should be unchanged. - - `spark.sql.streaming.multipleWatermarkPolicy` - - Modification of this would lead inconsistent watermark value when query contains multiple watermarks, hence the policy should be unchanged. + - `spark.sql.streaming.stateStore.providerClass`: To read the previous state of the query properly, the class of state store provider should be unchanged. + - `spark.sql.streaming.multipleWatermarkPolicy`: Modification of this would lead inconsistent watermark value when query contains multiple watermarks, hence the policy should be unchanged. **Further Reading**