Skip to content
Closed
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
create je_fix
  • Loading branch information
zhengruifeng committed Mar 18, 2016
commit dcce428b66aa0ffd0c468302598b823f8f7f4854
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,9 @@
* batches processed exceeds `numBatchesTimeout`.
*/
public class JavaStreamingTestExample {

private static int timeoutCounter = 0;

public static void main(String[] args) {
if (args.length != 3) {
System.err.println("Usage: JavaStreamingTestExample " +
Expand Down Expand Up @@ -94,22 +97,21 @@ public BinarySample call(String line) throws Exception {
// $example off$

// Stop processing if test becomes significant or we time out
final Accumulator<Integer> timeoutCounter =
ssc.sparkContext().accumulator(numBatchesTimeout);
timeoutCounter = numBatchesTimeout;

out.foreachRDD(new VoidFunction<JavaRDD<StreamingTestResult>>() {
@Override
public void call(JavaRDD<StreamingTestResult> rdd) throws Exception {
timeoutCounter.add(-1);
timeoutCounter -= 1;

long cntSignificant = rdd.filter(new Function<StreamingTestResult, Boolean>() {
boolean anySignificant = ! rdd.filter(new Function<StreamingTestResult, Boolean>() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(No space after "!")
Looking good, thank you for indulging. The example is now about as close as possible to the scala version

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, the space is removed.

@Override
public Boolean call(StreamingTestResult v) throws Exception {
return v.pValue() < 0.05;
}
}).count();
}).isEmpty();

if (timeoutCounter.value() <= 0 || cntSignificant > 0) {
if (timeoutCounter <= 0 || anySignificant) {
rdd.context().stop();
}
}
Expand Down