diff --git a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala index 5645996de5a6..ed77aa970abb 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala @@ -813,7 +813,6 @@ class StreamingContextSuite extends SparkFunSuite with BeforeAndAfter with Timeo val conf = new SparkConf().setMaster("local-cluster[2,1,1024]").setAppName(appName) ssc = new StreamingContext(conf, Milliseconds(100)) val input = ssc.receiverStream(new TestReceiver) - val latch = new CountDownLatch(1) @volatile var stopping = false input.count().foreachRDD { rdd => // Make sure we can read from BlockRDD @@ -823,18 +822,14 @@ class StreamingContextSuite extends SparkFunSuite with BeforeAndAfter with Timeo new Thread() { setDaemon(true) override def run(): Unit = { - ssc.stop(stopSparkContext = true, stopGracefully = false) - latch.countDown() + ssc.stop(stopSparkContext = false, stopGracefully = false) } }.start() } } ssc.start() ssc.awaitTerminationOrTimeout(60000) - // Wait until `ssc.top` returns. Otherwise, we may finish this test too fast and leak an active - // SparkContext. Note: the stop codes in `after` will just do nothing if `ssc.stop` in this test - // is running. - assert(latch.await(60, TimeUnit.SECONDS)) + ssc.sc.stop() } def addInputStream(s: StreamingContext): DStream[Int] = {