Skip to content

Commit 204c9de

Browse files
zhengruifengMLnick
authored andcommitted
[MINOR][DOC] Add JavaStreamingTestExample
## What changes were proposed in this pull request? Add the java example of StreamingTest ## How was this patch tested? manual tests in CLI: bin/run-example mllib.JavaStreamingTestExample dataDir 5 100 Author: Zheng RuiFeng <ruifengz@foxmail.com> Closes apache#11776 from zhengruifeng/streaming_je.
1 parent 30c1884 commit 204c9de

File tree

2 files changed

+128
-0
lines changed

2 files changed

+128
-0
lines changed

docs/mllib-statistics.md

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -544,6 +544,13 @@ provides streaming hypothesis testing.
544544

545545
{% include_example scala/org/apache/spark/examples/mllib/StreamingTestExample.scala %}
546546
</div>
547+
548+
<div data-lang="java" markdown="1">
549+
[`StreamingTest`](api/java/index.html#org.apache.spark.mllib.stat.test.StreamingTest)
550+
provides streaming hypothesis testing.
551+
552+
{% include_example java/org/apache/spark/examples/mllib/JavaStreamingTestExample.java %}
553+
</div>
547554
</div>
548555

549556

Lines changed: 121 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,121 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.examples.mllib;
19+
20+
21+
import org.apache.spark.Accumulator;
22+
import org.apache.spark.api.java.function.VoidFunction;
23+
import org.apache.spark.api.java.JavaRDD;
24+
import org.apache.spark.api.java.function.Function;
25+
// $example on$
26+
import org.apache.spark.mllib.stat.test.BinarySample;
27+
import org.apache.spark.mllib.stat.test.StreamingTest;
28+
import org.apache.spark.mllib.stat.test.StreamingTestResult;
29+
// $example off$
30+
import org.apache.spark.SparkConf;
31+
import org.apache.spark.streaming.Duration;
32+
import org.apache.spark.streaming.Seconds;
33+
import org.apache.spark.streaming.api.java.JavaDStream;
34+
import org.apache.spark.streaming.api.java.JavaStreamingContext;
35+
import org.apache.spark.util.Utils;
36+
37+
38+
/**
39+
* Perform streaming testing using Welch's 2-sample t-test on a stream of data, where the data
40+
* stream arrives as text files in a directory. Stops when the two groups are statistically
41+
* significant (p-value < 0.05) or after a user-specified timeout in number of batches is exceeded.
42+
*
43+
* The rows of the text files must be in the form `Boolean, Double`. For example:
44+
* false, -3.92
45+
* true, 99.32
46+
*
47+
* Usage:
48+
* JavaStreamingTestExample <dataDir> <batchDuration> <numBatchesTimeout>
49+
*
50+
* To run on your local machine using the directory `dataDir` with 5 seconds between each batch and
51+
* a timeout after 100 insignificant batches, call:
52+
* $ bin/run-example mllib.JavaStreamingTestExample dataDir 5 100
53+
*
54+
* As you add text files to `dataDir` the significance test wil continually update every
55+
* `batchDuration` seconds until the test becomes significant (p-value < 0.05) or the number of
56+
* batches processed exceeds `numBatchesTimeout`.
57+
*/
58+
public class JavaStreamingTestExample {
59+
public static void main(String[] args) {
60+
if (args.length != 3) {
61+
System.err.println("Usage: JavaStreamingTestExample " +
62+
"<dataDir> <batchDuration> <numBatchesTimeout>");
63+
System.exit(1);
64+
}
65+
66+
String dataDir = args[0];
67+
Duration batchDuration = Seconds.apply(Long.valueOf(args[1]));
68+
int numBatchesTimeout = Integer.valueOf(args[2]);
69+
70+
SparkConf conf = new SparkConf().setMaster("local").setAppName("StreamingTestExample");
71+
JavaStreamingContext ssc = new JavaStreamingContext(conf, batchDuration);
72+
73+
ssc.checkpoint(Utils.createTempDir(System.getProperty("java.io.tmpdir"), "spark").toString());
74+
75+
// $example on$
76+
JavaDStream<BinarySample> data = ssc.textFileStream(dataDir).map(
77+
new Function<String, BinarySample>() {
78+
@Override
79+
public BinarySample call(String line) throws Exception {
80+
String[] ts = line.split(",");
81+
boolean label = Boolean.valueOf(ts[0]);
82+
double value = Double.valueOf(ts[1]);
83+
return new BinarySample(label, value);
84+
}
85+
});
86+
87+
StreamingTest streamingTest = new StreamingTest()
88+
.setPeacePeriod(0)
89+
.setWindowSize(0)
90+
.setTestMethod("welch");
91+
92+
JavaDStream<StreamingTestResult> out = streamingTest.registerStream(data);
93+
out.print();
94+
// $example off$
95+
96+
// Stop processing if test becomes significant or we time out
97+
final Accumulator<Integer> timeoutCounter =
98+
ssc.sparkContext().accumulator(numBatchesTimeout);
99+
100+
out.foreachRDD(new VoidFunction<JavaRDD<StreamingTestResult>>() {
101+
@Override
102+
public void call(JavaRDD<StreamingTestResult> rdd) throws Exception {
103+
timeoutCounter.add(-1);
104+
105+
long cntSignificant = rdd.filter(new Function<StreamingTestResult, Boolean>() {
106+
@Override
107+
public Boolean call(StreamingTestResult v) throws Exception {
108+
return v.pValue() < 0.05;
109+
}
110+
}).count();
111+
112+
if (timeoutCounter.value() <= 0 || cntSignificant > 0) {
113+
rdd.context().stop();
114+
}
115+
}
116+
});
117+
118+
ssc.start();
119+
ssc.awaitTermination();
120+
}
121+
}

0 commit comments

Comments
 (0)