Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Add Java and Python examples
  • Loading branch information
zsxwing committed Dec 21, 2015
commit 9e241e791e5ce2cb0cc3dd7c609339ca8ea11129
94 changes: 92 additions & 2 deletions docs/streaming-programming-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -1484,7 +1484,66 @@ See the full [source code]({{site.SPARK_GITHUB_URL}}/blob/master/examples/src/ma
<div data-lang="java" markdown="1">
{% highlight java %}

TODO
class JavaWordBlacklist {

private static volatile Broadcast<List<String>> instance = null;

public static Broadcast<List<String>> getInstance(JavaSparkContext jsc) {
if (instance == null) {
synchronized (WordBlacklist.class) {
if (instance == null) {
List<String> wordBlacklist = Arrays.asList("a", "b", "c");
instance = jsc.broadcast(wordBlacklist);
}
}
}
return instance;
}
}

class JavaDroppedWordsCounter {

private static volatile Accumulator<Integer> instance = null;

public static Accumulator<Integer> getInstance(JavaSparkContext jsc) {
if (instance == null) {
synchronized (DroppedWordsCounter.class) {
if (instance == null) {
instance = jsc.accumulator(0, "WordsInBlacklistCounter");
}
}
}
return instance;
}
}

wordCounts.foreachRDD(new Function2<JavaPairRDD<String, Integer>, Time, Void>() {
@Override
public Void call(JavaPairRDD<String, Integer> rdd, Time time) throws IOException {
// Get or register the blacklist Broadcast
final Broadcast<List<String>> blacklist = JavaWordBlacklist.getInstance(new JavaSparkContext(rdd.context()));
// Get or register the droppedWordsCounter Accumulator
final Accumulator<Integer> droppedWordsCounter = JavaDroppedWordsCounter.getInstance(new JavaSparkContext(rdd.context()));
// Use blacklist to drop words and use droppedWordsCounter to count them
String counts = rdd.filter(new Function<Tuple2<String, Integer>, Boolean>() {
@Override
public Boolean call(Tuple2<String, Integer> wordCount) throws Exception {
if (blacklist.value().contains(wordCount._1())) {
droppedWordsCounter.add(wordCount._2());
return false;
} else {
return true;
}
}
}).collect().toString();
String output = "Counts at time " + time + " " + counts;
System.out.println(output);
System.out.println("Dropped " + droppedWordsCounter.value() + " word(s) totally");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove unnecessary stuff.

System.out.println("Appending to " + outputFile.getAbsolutePath());
Files.append(output + "\n", outputFile, Charset.defaultCharset());
return null;
}
}

{% endhighlight %}

Expand All @@ -1493,7 +1552,38 @@ See the full [source code]({{site.SPARK_GITHUB_URL}}/blob/master/examples/src/ma
<div data-lang="python" markdown="1">
{% highlight python %}

TODO
def getWordBlacklist(sparkContext):
if ('wordBlacklist' not in globals()):
globals()['wordBlacklist'] = sparkContext.broadcast(["a", "b", "c"])
return globals()['wordBlacklist']

def getDroppedWordsCounter(sparkContext):
if ('droppedWordsCounter' not in globals()):
globals()['droppedWordsCounter'] = sparkContext.accumulator(0)
return globals()['droppedWordsCounter']

def echo(time, rdd):
# Get or register the blacklist Broadcast
blacklist = getWordBlacklist(rdd.context)
# Get or register the droppedWordsCounter Accumulator
droppedWordsCounter = getDroppedWordsCounter(rdd.context)

# Use blacklist to drop words and use droppedWordsCounter to count them
def filterFunc(wordCount):
if wordCount[0] in blacklist.value:
droppedWordsCounter.add(wordCount[1])
False
else:
True

counts = "Counts at time %s %s" % (time, rdd.filter(filterFunc).collect())
print(counts)
print("Dropped %d word(s) totally" % droppedWordsCounter.value)
print("Appending to " + os.path.abspath(outputPath))
with open(outputPath, 'a') as f:
f.write(counts + "\n")

wordCounts.foreachRDD(echo)

{% endhighlight %}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,17 +21,22 @@
import java.io.IOException;
import java.nio.charset.Charset;
import java.util.Arrays;
import java.util.List;
import java.util.regex.Pattern;

import scala.Tuple2;
import com.google.common.collect.Lists;
import com.google.common.io.Files;

import org.apache.spark.Accumulator;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.broadcast.Broadcast;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.Time;
import org.apache.spark.streaming.api.java.JavaDStream;
Expand All @@ -41,7 +46,48 @@
import org.apache.spark.streaming.api.java.JavaStreamingContextFactory;

/**
* Counts words in text encoded with UTF8 received from the network every second.
* Use this singleton to get or register `Broadcast`.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"broadcast variable".
why is Broadcast in single quotes?

*/
class JavaWordBlacklist {

private static volatile Broadcast<List<String>> instance = null;

public static Broadcast<List<String>> getInstance(JavaSparkContext jsc) {
if (instance == null) {
synchronized (WordBlacklist.class) {
if (instance == null) {
List<String> wordBlacklist = Arrays.asList("a", "b", "c");
instance = jsc.broadcast(wordBlacklist);
}
}
}
return instance;
}
}

/**
* Use this singleton to get or register `Accumulator`.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similar comment as above for Broadcast.. "an accumulator"

*/
class JavaDroppedWordsCounter {

private static volatile Accumulator<Integer> instance = null;

public static Accumulator<Integer> getInstance(JavaSparkContext jsc) {
if (instance == null) {
synchronized (DroppedWordsCounter.class) {
if (instance == null) {
instance = jsc.accumulator(0, "WordsInBlacklistCounter");
}
}
}
return instance;
}
}

/**
* Counts words in text encoded with UTF8 received from the network every second. This example also
* shows how to use lazily instantiated singleton instances for Accumulator and Broadcast so that
* they can be registered on driver failures.
*
* Usage: JavaRecoverableNetworkWordCount <hostname> <port> <checkpoint-directory> <output-file>
* <hostname> and <port> describe the TCP server that Spark Streaming would connect to receive
Expand Down Expand Up @@ -111,10 +157,27 @@ public Integer call(Integer i1, Integer i2) {
wordCounts.foreachRDD(new Function2<JavaPairRDD<String, Integer>, Time, Void>() {
@Override
public Void call(JavaPairRDD<String, Integer> rdd, Time time) throws IOException {
String counts = "Counts at time " + time + " " + rdd.collect();
System.out.println(counts);
// Get or register the blacklist Broadcast
final Broadcast<List<String>> blacklist = JavaWordBlacklist.getInstance(new JavaSparkContext(rdd.context()));
// Get or register the droppedWordsCounter Accumulator
final Accumulator<Integer> droppedWordsCounter = JavaDroppedWordsCounter.getInstance(new JavaSparkContext(rdd.context()));
// Use blacklist to drop words and use droppedWordsCounter to count them
String counts = rdd.filter(new Function<Tuple2<String, Integer>, Boolean>() {
@Override
public Boolean call(Tuple2<String, Integer> wordCount) throws Exception {
if (blacklist.value().contains(wordCount._1())) {
droppedWordsCounter.add(wordCount._2());
return false;
} else {
return true;
}
}
}).collect().toString();
String output = "Counts at time " + time + " " + counts;
System.out.println(output);
System.out.println("Dropped " + droppedWordsCounter.value() + " word(s) totally");
System.out.println("Appending to " + outputFile.getAbsolutePath());
Files.append(counts + "\n", outputFile, Charset.defaultCharset());
Files.append(output + "\n", outputFile, Charset.defaultCharset());
return null;
}
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,20 @@
from pyspark.streaming import StreamingContext


# Get or register `Broadcast`
def getWordBlacklist(sparkContext):
if ('wordBlacklist' not in globals()):
globals()['wordBlacklist'] = sparkContext.broadcast(["a", "b", "c"])
return globals()['wordBlacklist']


# Get or register `Accumulator`
def getDroppedWordsCounter(sparkContext):
if ('droppedWordsCounter' not in globals()):
globals()['droppedWordsCounter'] = sparkContext.accumulator(0)
return globals()['droppedWordsCounter']


def createContext(host, port, outputPath):
# If you do not see this printed, that means the StreamingContext has been loaded
# from the new checkpoint
Expand All @@ -60,8 +74,22 @@ def createContext(host, port, outputPath):
wordCounts = words.map(lambda x: (x, 1)).reduceByKey(lambda x, y: x + y)

def echo(time, rdd):
counts = "Counts at time %s %s" % (time, rdd.collect())
# Get or register the blacklist Broadcast
blacklist = getWordBlacklist(rdd.context)
# Get or register the droppedWordsCounter Accumulator
droppedWordsCounter = getDroppedWordsCounter(rdd.context)

# Use blacklist to drop words and use droppedWordsCounter to count them
def filterFunc(wordCount):
if wordCount[0] in blacklist.value:
droppedWordsCounter.add(wordCount[1])
False
else:
True

counts = "Counts at time %s %s" % (time, rdd.filter(filterFunc).collect())
print(counts)
print("Dropped %d word(s) totally" % droppedWordsCounter.value)
print("Appending to " + os.path.abspath(outputPath))
with open(outputPath, 'a') as f:
f.write(counts + "\n")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ object RecoverableNetworkWordCount {
// Use blacklist to drop words and use droppedWordsCounter to count them
val counts = rdd.filter { case (word, count) =>
if (blacklist.value.contains(word)) {
droppedWordsCounter += 1
droppedWordsCounter += count
false
} else {
true
Expand Down