Skip to content

Commit 2c3d961

Browse files
committed
[SPARK-16256][DOCS] Minor fixes on the Structured Streaming Programming Guide
Author: Tathagata Das <tathagata.das1565@gmail.com> Closes #13978 from tdas/SPARK-16256-1.
1 parent dedbcee commit 2c3d961

File tree

1 file changed

+23
-21
lines changed

1 file changed

+23
-21
lines changed

docs/structured-streaming-programming-guide.md

Lines changed: 23 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -459,7 +459,7 @@ val csvDF = spark
459459
.readStream
460460
.option("sep", ";")
461461
.schema(userSchema) // Specify schema of the parquet files
462-
.csv("/path/to/directory") // Equivalent to format("cv").load("/path/to/directory")
462+
.csv("/path/to/directory") // Equivalent to format("csv").load("/path/to/directory")
463463
{% endhighlight %}
464464

465465
</div>
@@ -486,7 +486,7 @@ Dataset[Row] csvDF = spark
486486
.readStream()
487487
.option("sep", ";")
488488
.schema(userSchema) // Specify schema of the parquet files
489-
.csv("/path/to/directory"); // Equivalent to format("cv").load("/path/to/directory")
489+
.csv("/path/to/directory"); // Equivalent to format("csv").load("/path/to/directory")
490490
{% endhighlight %}
491491

492492
</div>
@@ -513,7 +513,7 @@ csvDF = spark \
513513
.readStream() \
514514
.option("sep", ";") \
515515
.schema(userSchema) \
516-
.csv("/path/to/directory") # Equivalent to format("cv").load("/path/to/directory")
516+
.csv("/path/to/directory") # Equivalent to format("csv").load("/path/to/directory")
517517
{% endhighlight %}
518518

519519
</div>
@@ -522,10 +522,10 @@ csvDF = spark \
522522
These examples generate streaming DataFrames that are untyped, meaning that the schema of the DataFrame is not checked at compile time, only checked at runtime when the query is submitted. Some operations like `map`, `flatMap`, etc. need the type to be known at compile time. To do those, you can convert these untyped streaming DataFrames to typed streaming Datasets using the same methods as static DataFrame. See the SQL Programming Guide for more details. Additionally, more details on the supported streaming sources are discussed later in the document.
523523

524524
## Operations on streaming DataFrames/Datasets
525-
You can apply all kinds of operations on streaming DataFrames/Datasets - ranging from untyped, SQL-like operations (e.g. select, where, groupBy), to typed RDD-like operations (e.g. map, filter, flatMap). See the SQL programming guide for more details. Let’s take a look at a few example operations that you can use.
525+
You can apply all kinds of operations on streaming DataFrames/Datasets - ranging from untyped, SQL-like operations (e.g. select, where, groupBy), to typed RDD-like operations (e.g. map, filter, flatMap). See the [SQL programming guide](sql-programming-guide.html) for more details. Let’s take a look at a few example operations that you can use.
526526

527527
### Basic Operations - Selection, Projection, Aggregation
528-
Most of the common operations on DataFrame/Dataset are supported for streaming. The few operations that are not supported are discussed later in this section.
528+
Most of the common operations on DataFrame/Dataset are supported for streaming. The few operations that are not supported are [discussed later](#unsupported-operations) in this section.
529529

530530
<div class="codetabs">
531531
<div data-lang="scala" markdown="1">
@@ -618,7 +618,7 @@ df.groupBy("type").count()
618618
</div>
619619

620620
### Window Operations on Event Time
621-
Aggregations over a sliding event-time window are straightforward with Structured Streaming. The key idea to understand about window-based aggregations are very similar to grouped aggregations. In a grouped aggregation, aggregate values (e.g. counts) are maintained for each unique value in the user-specified grouping column. In case of, window-based aggregations, aggregate values are maintained for each window the event-time of a row falls into. Let's understand this with an illustration.
621+
Aggregations over a sliding event-time window are straightforward with Structured Streaming. The key idea to understand about window-based aggregations are very similar to grouped aggregations. In a grouped aggregation, aggregate values (e.g. counts) are maintained for each unique value in the user-specified grouping column. In case of window-based aggregations, aggregate values are maintained for each window the event-time of a row falls into. Let's understand this with an illustration.
622622

623623
Imagine the quick example is modified and the stream contains lines along with the time when the line was generated. Instead of running word counts, we want to count words within 10 minute windows, updating every 5 minutes. That is, word counts in words received between 10 minute windows 12:00 - 12:10, 12:05 - 12:15, 12:10 - 12:20, etc. Note that 12:00 - 12:10 means data that arrived after 12:00 but before 12:10. Now, consider a word that was received at 12:07. This word should increment the counts corresponding to two windows 12:00 - 12:10 and 12:05 - 12:15. So the counts will be indexed by both, the grouping key (i.e. the word) and the window (can be calculated from the event-time).
624624

@@ -680,7 +680,7 @@ df.groupBy("type", window("time", "1 minute")).avg("signal")
680680

681681
Now consider what happens if one of the events arrives late to the application.
682682
For example, a word that was generated at 12:04 but it was received at 12:11.
683-
Since this windowing is based on the time in the data, the time 12:04 should considered for windowing. This occurs naturally in our window-based grouping --the late data is automatically placed in the proper windows and the correct aggregates updated as illustrated below.
683+
Since this windowing is based on the time in the data, the time 12:04 should be considered for windowing. This occurs naturally in our window-based grouping - the late data is automatically placed in the proper windows and the correct aggregates updated as illustrated below.
684684

685685
![Handling Late Data](img/structured-streaming-late-data.png)
686686

@@ -724,23 +724,25 @@ streamingDf.join(staticDf, "type", "right_join") # right outer join with a stat
724724
</div>
725725

726726
### Unsupported Operations
727-
However, note that all of the operations applicable on static DataFrames/Datasets are not supported in streaming DataFrames/Datasets yet. While some of these unsupported operations will be supported in future releases of Spark, there are others which are fundamentally hard to implement on streaming data efficiently. As of Spark 2.0, some of the unsupported operations are as follows
727+
However, note that all of the operations applicable on static DataFrames/Datasets are not supported in streaming DataFrames/Datasets yet. While some of these unsupported operations will be supported in future releases of Spark, there are others which are fundamentally hard to implement on streaming data efficiently. For example, sorting is not supported on the input streaming Dataset, as it requires keeping track of all the data received in the stream. This is therefore fundamentally hard to execute efficiently. As of Spark 2.0, some of the unsupported operations are as follows
728728

729-
- Multiple aggregations (i.e. a chain of aggregations on a streaming DF) are not yet supported
729+
- Multiple streaming aggregations (i.e. a chain of aggregations on a streaming DF) are not yet supported on streaming Datasets.
730730

731-
- Limit and take first N rows are not supported
731+
- Limit and take first N rows are not supported on streaming Datasets.
732732

733-
- Distinct and sorting operations are not supported
733+
- Distinct operations on streaming Datasets are not supported.
734734

735-
- Stream-batch outer joins are conditionally supported
735+
- Sorting operations are supported on streaming Datasets only after an aggregation and in Complete Output Mode.
736736

737-
+ Full outer join not allowed
737+
- Outer joins between a streaming and a static Datasets are conditionally supported.
738738

739-
+ Left outer join with a streaming DF on the left is not supported
739+
+ Full outer join with a streaming Dataset is not supported
740740

741-
+ Right outer join with a streaming DF on the right is not supported
741+
+ Left outer join with a streaming Dataset on the left is not supported
742742

743-
- Stream-stream joins are not yet supported
743+
+ Right outer join with a streaming Dataset on the right is not supported
744+
745+
- Any kind of joins between two streaming Datasets are not yet supported.
744746

745747
In addition, there are some Dataset methods that will not work on streaming Datasets. They are actions that will immediately run queries and return results, which does not makes sense on a streaming Dataset. Rather those functionalities can be done by explicitly starting a streaming query (see the next section regarding that).
746748

@@ -753,7 +755,7 @@ In addition, there are some Dataset methods that will not work on streaming Data
753755
If you try any of these operations, you will see an AnalysisException like "operation XYZ is not supported with streaming DataFrames/Datasets".
754756

755757
## Starting Streaming Queries
756-
Once you have defined the final result DataFrame/Dataset, all that is left is for you start the StreamingQuery. To do that, you have to use the
758+
Once you have defined the final result DataFrame/Dataset, all that is left is for you start the streaming computation. To do that, you have to use the
757759
`DataStreamWriter` (
758760
[Scala](api/scala/index.html#org.apache.spark.sql.streaming.DataStreamWriter)/
759761
[Java](api/java/org/apache/spark/sql/streaming/DataStreamWriter.html)/
@@ -867,7 +869,7 @@ aggDF
867869
.format("memory")
868870
.start()
869871

870-
spark.sql("select * from aggregates).show() // interactively query in-memory table
872+
spark.sql("select * from aggregates").show() // interactively query in-memory table
871873
{% endhighlight %}
872874

873875
</div>
@@ -907,7 +909,7 @@ aggDF
907909
.format("memory")
908910
.start();
909911

910-
spark.sql("select * from aggregates).show(); // interactively query in-memory table
912+
spark.sql("select * from aggregates").show(); // interactively query in-memory table
911913
{% endhighlight %}
912914

913915
</div>
@@ -947,7 +949,7 @@ aggDF\
947949
.format("memory")\
948950
.start()
949951

950-
spark.sql("select * from aggregates).show() # interactively query in-memory table
952+
spark.sql("select * from aggregates").show() # interactively query in-memory table
951953
{% endhighlight %}
952954

953955
</div>
@@ -1144,7 +1146,7 @@ aggDF\
11441146
- Examples: See and run the
11451147
[Scala]({{site.SPARK_GITHUB_URL}}/tree/master/examples/src/main/scala/org/apache/spark/examples/sql/streaming)/[Java]({{site.SPARK_GITHUB_URL}}/tree/master/examples/src/main/java/org/apache/spark/examples/sql/streaming)/[Python]({{site.SPARK_GITHUB_URL}}/tree/master/examples/src/main/python/sql/streaming)
11461148
examples.
1147-
- Spark Summit 2016 Talk - [A Deep Dive into Structured Streaming(https://spark-summit.org/2016/events/a-deep-dive-into-structured-streaming/)
1149+
- Spark Summit 2016 Talk - [A Deep Dive into Structured Streaming](https://spark-summit.org/2016/events/a-deep-dive-into-structured-streaming/)
11481150

11491151

11501152

0 commit comments

Comments
 (0)