From dcce428b66aa0ffd0c468302598b823f8f7f4854 Mon Sep 17 00:00:00 2001 From: Zheng RuiFeng Date: Fri, 18 Mar 2016 19:04:56 +0800 Subject: [PATCH 1/3] create je_fix --- .../examples/mllib/JavaStreamingTestExample.java | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/examples/src/main/java/org/apache/spark/examples/mllib/JavaStreamingTestExample.java b/examples/src/main/java/org/apache/spark/examples/mllib/JavaStreamingTestExample.java index 2197ef9481a7..a8794a55e9e7 100644 --- a/examples/src/main/java/org/apache/spark/examples/mllib/JavaStreamingTestExample.java +++ b/examples/src/main/java/org/apache/spark/examples/mllib/JavaStreamingTestExample.java @@ -56,6 +56,9 @@ * batches processed exceeds `numBatchesTimeout`. */ public class JavaStreamingTestExample { + + private static int timeoutCounter = 0; + public static void main(String[] args) { if (args.length != 3) { System.err.println("Usage: JavaStreamingTestExample " + @@ -94,22 +97,21 @@ public BinarySample call(String line) throws Exception { // $example off$ // Stop processing if test becomes significant or we time out - final Accumulator timeoutCounter = - ssc.sparkContext().accumulator(numBatchesTimeout); + timeoutCounter = numBatchesTimeout; out.foreachRDD(new VoidFunction>() { @Override public void call(JavaRDD rdd) throws Exception { - timeoutCounter.add(-1); + timeoutCounter -= 1; - long cntSignificant = rdd.filter(new Function() { + boolean anySignificant = ! rdd.filter(new Function() { @Override public Boolean call(StreamingTestResult v) throws Exception { return v.pValue() < 0.05; } - }).count(); + }).isEmpty(); - if (timeoutCounter.value() <= 0 || cntSignificant > 0) { + if (timeoutCounter <= 0 || anySignificant) { rdd.context().stop(); } } From 09ad928e3f5efde847eae324b648bfed227c0f34 Mon Sep 17 00:00:00 2001 From: Zheng RuiFeng Date: Fri, 18 Mar 2016 19:07:47 +0800 Subject: [PATCH 2/3] del throws --- .../spark/examples/mllib/JavaStreamingTestExample.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/examples/src/main/java/org/apache/spark/examples/mllib/JavaStreamingTestExample.java b/examples/src/main/java/org/apache/spark/examples/mllib/JavaStreamingTestExample.java index a8794a55e9e7..6e360a970eae 100644 --- a/examples/src/main/java/org/apache/spark/examples/mllib/JavaStreamingTestExample.java +++ b/examples/src/main/java/org/apache/spark/examples/mllib/JavaStreamingTestExample.java @@ -79,7 +79,7 @@ public static void main(String[] args) { JavaDStream data = ssc.textFileStream(dataDir).map( new Function() { @Override - public BinarySample call(String line) throws Exception { + public BinarySample call(String line) { String[] ts = line.split(","); boolean label = Boolean.valueOf(ts[0]); double value = Double.valueOf(ts[1]); @@ -101,12 +101,12 @@ public BinarySample call(String line) throws Exception { out.foreachRDD(new VoidFunction>() { @Override - public void call(JavaRDD rdd) throws Exception { + public void call(JavaRDD rdd) { timeoutCounter -= 1; boolean anySignificant = ! rdd.filter(new Function() { @Override - public Boolean call(StreamingTestResult v) throws Exception { + public Boolean call(StreamingTestResult v) { return v.pValue() < 0.05; } }).isEmpty(); From 9a16f30c3f4d3ec2299c9e476e48608406cb9c46 Mon Sep 17 00:00:00 2001 From: Zheng RuiFeng Date: Fri, 18 Mar 2016 19:22:31 +0800 Subject: [PATCH 3/3] del space --- .../apache/spark/examples/mllib/JavaStreamingTestExample.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/examples/src/main/java/org/apache/spark/examples/mllib/JavaStreamingTestExample.java b/examples/src/main/java/org/apache/spark/examples/mllib/JavaStreamingTestExample.java index 6e360a970eae..4c8755916c1a 100644 --- a/examples/src/main/java/org/apache/spark/examples/mllib/JavaStreamingTestExample.java +++ b/examples/src/main/java/org/apache/spark/examples/mllib/JavaStreamingTestExample.java @@ -104,7 +104,7 @@ public BinarySample call(String line) { public void call(JavaRDD rdd) { timeoutCounter -= 1; - boolean anySignificant = ! rdd.filter(new Function() { + boolean anySignificant = !rdd.filter(new Function() { @Override public Boolean call(StreamingTestResult v) { return v.pValue() < 0.05;