From 89d1d35562bdb47c54464f31adeddadbe3a3ec1b Mon Sep 17 00:00:00 2001 From: uncleGen Date: Wed, 29 Mar 2017 12:43:49 +0800 Subject: [PATCH 1/2] Flaky Test: org.apache.spark.streaming.StreamingContextSuite --- .../org/apache/spark/streaming/StreamingContextSuite.scala | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala index 5645996de5a6..18c0524c706f 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala @@ -823,7 +823,7 @@ class StreamingContextSuite extends SparkFunSuite with BeforeAndAfter with Timeo new Thread() { setDaemon(true) override def run(): Unit = { - ssc.stop(stopSparkContext = true, stopGracefully = false) + ssc.stop(stopSparkContext = false, stopGracefully = false) latch.countDown() } }.start() @@ -831,10 +831,7 @@ class StreamingContextSuite extends SparkFunSuite with BeforeAndAfter with Timeo } ssc.start() ssc.awaitTerminationOrTimeout(60000) - // Wait until `ssc.top` returns. Otherwise, we may finish this test too fast and leak an active - // SparkContext. Note: the stop codes in `after` will just do nothing if `ssc.stop` in this test - // is running. - assert(latch.await(60, TimeUnit.SECONDS)) + ssc.sc.stop() } def addInputStream(s: StreamingContext): DStream[Int] = { From c68c285d3daa2c2dc584835989f9d23cd3fe398d Mon Sep 17 00:00:00 2001 From: uncleGen Date: Wed, 29 Mar 2017 12:47:32 +0800 Subject: [PATCH 2/2] update --- .../org/apache/spark/streaming/StreamingContextSuite.scala | 2 -- 1 file changed, 2 deletions(-) diff --git a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala index 18c0524c706f..ed77aa970abb 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala @@ -813,7 +813,6 @@ class StreamingContextSuite extends SparkFunSuite with BeforeAndAfter with Timeo val conf = new SparkConf().setMaster("local-cluster[2,1,1024]").setAppName(appName) ssc = new StreamingContext(conf, Milliseconds(100)) val input = ssc.receiverStream(new TestReceiver) - val latch = new CountDownLatch(1) @volatile var stopping = false input.count().foreachRDD { rdd => // Make sure we can read from BlockRDD @@ -824,7 +823,6 @@ class StreamingContextSuite extends SparkFunSuite with BeforeAndAfter with Timeo setDaemon(true) override def run(): Unit = { ssc.stop(stopSparkContext = false, stopGracefully = false) - latch.countDown() } }.start() }