Skip to content

Commit 9928ca5

Browse files
committed
Add Accumulator and Broadcast Scala example for Streaming
1 parent 4af647c commit 9928ca5

File tree

2 files changed

+150
-5
lines changed

2 files changed

+150
-5
lines changed

docs/streaming-programming-guide.md

Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1415,6 +1415,95 @@ Note that the connections in the pool should be lazily created on demand and tim
14151415

14161416
***
14171417

1418+
## Accumulator and Broadcast
1419+
1420+
Accumulator and Broadcast cannot be recovered from checkpoint in Streaming. If you enable checkpoint and use Accumulator or Broadcast as well, you have to create lazily instantiated singleton instances for Accumulator and Broadcast so that they can be restarted on driver failures. This is shown in the following example.
1421+
1422+
<div class="codetabs">
1423+
<div data-lang="scala" markdown="1">
1424+
{% highlight scala %}
1425+
1426+
object WordBlacklist {
1427+
1428+
@volatile private var instance: Broadcast[Seq[String]] = null
1429+
1430+
def getInstance(sc: SparkContext): Broadcast[Seq[String]] = {
1431+
if (instance == null) {
1432+
synchronized {
1433+
if (instance == null) {
1434+
val wordBlacklist = Seq("a", "b", "c")
1435+
instance = sc.broadcast(wordBlacklist)
1436+
}
1437+
}
1438+
}
1439+
instance
1440+
}
1441+
}
1442+
1443+
object DroppedWordsCounter {
1444+
1445+
@volatile private var instance: Accumulator[Long] = null
1446+
1447+
def getInstance(sc: SparkContext): Accumulator[Long] = {
1448+
if (instance == null) {
1449+
synchronized {
1450+
if (instance == null) {
1451+
instance = sc.accumulator(0L, "WordsInBlacklistCounter")
1452+
}
1453+
}
1454+
}
1455+
instance
1456+
}
1457+
}
1458+
1459+
wordCounts.foreachRDD((rdd: RDD[(String, Int)], time: Time) => {
1460+
// Get or register the blacklist Broadcast
1461+
val blacklist = WordBlacklist.getInstance(rdd.sparkContext)
1462+
// Get or register the droppedWordsCounter Accumulator
1463+
val droppedWordsCounter = DroppedWordsCounter.getInstance(rdd.sparkContext)
1464+
// Use blacklist to drop words and use droppedWordsCounter to count them
1465+
val counts = rdd.filter { case (word, count) =>
1466+
if (blacklist.value.contains(word)) {
1467+
droppedWordsCounter += 1
1468+
false
1469+
} else {
1470+
true
1471+
}
1472+
}.collect().mkString("[", ", ", "]")
1473+
val output = "Counts at time " + time + " " + counts
1474+
println(output)
1475+
println("Dropped " + droppedWordsCounter.value + " word(s) totally")
1476+
println("Appending to " + outputFile.getAbsolutePath)
1477+
Files.append(output + "\n", outputFile, Charset.defaultCharset())
1478+
})
1479+
1480+
{% endhighlight %}
1481+
1482+
See the full [source code]({{site.SPARK_GITHUB_URL}}/blob/master/examples/src/main/scala/org/apache/spark/examples/streaming/RecoverableNetworkWordCount.scala).
1483+
</div>
1484+
<div data-lang="java" markdown="1">
1485+
{% highlight java %}
1486+
1487+
TODO
1488+
1489+
{% endhighlight %}
1490+
1491+
See the full [source code]({{site.SPARK_GITHUB_URL}}/blob/master/examples/src/main/java/org/apache/spark/examples/streaming/JavaRecoverableNetworkWordCount.java).
1492+
</div>
1493+
<div data-lang="python" markdown="1">
1494+
{% highlight python %}
1495+
1496+
TODO
1497+
1498+
{% endhighlight %}
1499+
1500+
See the full [source code]({{site.SPARK_GITHUB_URL}}/blob/master/examples/src/main/python/streaming/recoverable_network_wordcount.py).
1501+
1502+
</div>
1503+
</div>
1504+
1505+
***
1506+
14181507
## DataFrame and SQL Operations
14191508
You can easily use [DataFrames and SQL](sql-programming-guide.html) operations on streaming data. You have to create a SQLContext using the SparkContext that the StreamingContext is using. Furthermore this has to done such that it can be restarted on driver failures. This is done by creating a lazily instantiated singleton instance of SQLContext. This is shown in the following example. It modifies the earlier [word count example](#a-quick-example) to generate word counts using DataFrames and SQL. Each RDD is converted to a DataFrame, registered as a temporary table and then queried using SQL.
14201509

examples/src/main/scala/org/apache/spark/examples/streaming/RecoverableNetworkWordCount.scala

Lines changed: 61 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -23,13 +23,55 @@ import java.nio.charset.Charset
2323

2424
import com.google.common.io.Files
2525

26-
import org.apache.spark.SparkConf
26+
import org.apache.spark.{Accumulator, SparkConf, SparkContext}
27+
import org.apache.spark.broadcast.Broadcast
2728
import org.apache.spark.rdd.RDD
2829
import org.apache.spark.streaming.{Time, Seconds, StreamingContext}
2930
import org.apache.spark.util.IntParam
3031

3132
/**
32-
* Counts words in text encoded with UTF8 received from the network every second.
33+
* Use this singleton to get or register `Broadcast`.
34+
*/
35+
object WordBlacklist {
36+
37+
@volatile private var instance: Broadcast[Seq[String]] = null
38+
39+
def getInstance(sc: SparkContext): Broadcast[Seq[String]] = {
40+
if (instance == null) {
41+
synchronized {
42+
if (instance == null) {
43+
val wordBlacklist = Seq("a", "b", "c")
44+
instance = sc.broadcast(wordBlacklist)
45+
}
46+
}
47+
}
48+
instance
49+
}
50+
}
51+
52+
/**
53+
* Use this singleton to get or register `Accumulator`.
54+
*/
55+
object DroppedWordsCounter {
56+
57+
@volatile private var instance: Accumulator[Long] = null
58+
59+
def getInstance(sc: SparkContext): Accumulator[Long] = {
60+
if (instance == null) {
61+
synchronized {
62+
if (instance == null) {
63+
instance = sc.accumulator(0L, "WordsInBlacklistCounter")
64+
}
65+
}
66+
}
67+
instance
68+
}
69+
}
70+
71+
/**
72+
* Counts words in text encoded with UTF8 received from the network every second. This example also
73+
* shows how to use lazily instantiated singleton instances for Accumulator and Broadcast so that
74+
* they can be registered on driver failures.
3375
*
3476
* Usage: RecoverableNetworkWordCount <hostname> <port> <checkpoint-directory> <output-file>
3577
* <hostname> and <port> describe the TCP server that Spark Streaming would connect to receive
@@ -75,10 +117,24 @@ object RecoverableNetworkWordCount {
75117
val words = lines.flatMap(_.split(" "))
76118
val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
77119
wordCounts.foreachRDD((rdd: RDD[(String, Int)], time: Time) => {
78-
val counts = "Counts at time " + time + " " + rdd.collect().mkString("[", ", ", "]")
79-
println(counts)
120+
// Get or register the blacklist Broadcast
121+
val blacklist = WordBlacklist.getInstance(rdd.sparkContext)
122+
// Get or register the droppedWordsCounter Accumulator
123+
val droppedWordsCounter = DroppedWordsCounter.getInstance(rdd.sparkContext)
124+
// Use blacklist to drop words and use droppedWordsCounter to count them
125+
val counts = rdd.filter { case (word, count) =>
126+
if (blacklist.value.contains(word)) {
127+
droppedWordsCounter += 1
128+
false
129+
} else {
130+
true
131+
}
132+
}.collect().mkString("[", ", ", "]")
133+
val output = "Counts at time " + time + " " + counts
134+
println(output)
135+
println("Dropped " + droppedWordsCounter.value + " word(s) totally")
80136
println("Appending to " + outputFile.getAbsolutePath)
81-
Files.append(counts + "\n", outputFile, Charset.defaultCharset())
137+
Files.append(output + "\n", outputFile, Charset.defaultCharset())
82138
})
83139
ssc
84140
}

0 commit comments

Comments
 (0)