From d8cb577b24f42a0509ee3a0fffb09181abf4137e Mon Sep 17 00:00:00 2001 From: Neelesh Srinivas Salian Date: Sun, 12 Jul 2015 18:38:36 -0700 Subject: [PATCH 1/7] Added registerSource to start() and removeSource to stop(). Wrote a test to check the registration and de-registration --- .../spark/streaming/StreamingContext.scala | 10 +++-- .../streaming/StreamingContextSuite.scala | 44 ++++++++++++++++++- 2 files changed, 48 insertions(+), 6 deletions(-) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala index ec49d0f42d12..6b78a82e68c2 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala @@ -192,11 +192,8 @@ class StreamingContext private[streaming] ( None } - /** Register streaming source to metrics system */ + /* Initializing a streamingSource to register metrics */ private val streamingSource = new StreamingSource(this) - assert(env != null) - assert(env.metricsSystem != null) - env.metricsSystem.registerSource(streamingSource) private var state: StreamingContextState = INITIALIZED @@ -606,6 +603,9 @@ class StreamingContext private[streaming] ( } shutdownHookRef = Utils.addShutdownHook( StreamingContext.SHUTDOWN_HOOK_PRIORITY)(stopOnShutdown) + // Registering Streaming Metrics at the start of the StreamingContext + assert(env.metricsSystem != null) + env.metricsSystem.registerSource(streamingSource) uiTab.foreach(_.attach()) logInfo("StreamingContext started") case ACTIVE => @@ -682,6 +682,8 @@ class StreamingContext private[streaming] ( logWarning("StreamingContext has already been stopped") case ACTIVE => scheduler.stop(stopGracefully) + // Removing the streamingSource to de-register the metrics on stop() + env.metricsSystem.removeSource(streamingSource) uiTab.foreach(_.detach()) StreamingContext.setActiveContext(null) waiter.notifyStop() diff --git a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala index 56b4ce5638a5..71e35a1d9fff 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala @@ -33,8 +33,12 @@ import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.dstream.DStream import org.apache.spark.streaming.receiver.Receiver import org.apache.spark.util.Utils -import org.apache.spark.{Logging, SparkConf, SparkContext, SparkException, SparkFunSuite} - +import org.apache.spark.{Logging, SparkConf, SparkContext, SparkFunSuite} +import org.apache.spark.metrics.MetricsSystem +import org.apache.spark.metrics.source.Source +import org.scalatest.{PrivateMethodTester, Assertions, BeforeAndAfter} +import org.apache.spark.{Logging, SparkConf, SparkContext, SparkFunSuite} +import scala.collection.mutable.ArrayBuffer class StreamingContextSuite extends SparkFunSuite with BeforeAndAfter with Timeouts with Logging { @@ -299,6 +303,26 @@ class StreamingContextSuite extends SparkFunSuite with BeforeAndAfter with Timeo Thread.sleep(100) } + test ("registering and de-registering of streamingSource") { + val conf = new SparkConf().setMaster(master).setAppName(appName) + ssc = new StreamingContext(conf, batchDuration) + assert(ssc.getState() === StreamingContextState.INITIALIZED) + addInputStream(ssc).register() + ssc.start() + + val sources = StreamingContextSuite.getSources(ssc.env.metricsSystem) + val streamingSource = StreamingContextSuite.getStreamingSource(ssc) + assert(sources.contains(streamingSource)) + assert(ssc.getState() === StreamingContextState.ACTIVE) + Thread.sleep(100) + + ssc.stop() + val sourcesAfterStop = StreamingContextSuite.getSources(ssc.env.metricsSystem) + val streamingSourceAfterStop = StreamingContextSuite.getStreamingSource(ssc) + assert(ssc.getState() === StreamingContextState.STOPPED) + assert(!sourcesAfterStop.contains(streamingSourceAfterStop)) + } + test("awaitTermination") { ssc = new StreamingContext(master, appName, batchDuration) val inputStream = addInputStream(ssc) @@ -811,3 +835,19 @@ package object testPackage extends Assertions { } } } + +/** + * Helper methods for testing StreamingContextSuite + * This includes methods to access private methods and fields in StreamingContext and MetricsSystem + */ + +private object StreamingContextSuite extends PrivateMethodTester { + private val _sources = PrivateMethod[ArrayBuffer[Source]]('sources) + private def getSources(metricsSystem: MetricsSystem): ArrayBuffer[Source] = { + metricsSystem.invokePrivate(_sources()) + } + private val _streamingSource = PrivateMethod[StreamingSource]('streamingSource) + private def getStreamingSource(streamingContext: StreamingContext): StreamingSource = { + streamingContext.invokePrivate(_streamingSource()) + } +} From 59227a418a40f494a9bd25fe1a4e038f6dc55d40 Mon Sep 17 00:00:00 2001 From: Neelesh Srinivas Salian Date: Mon, 13 Jul 2015 05:07:35 -0700 Subject: [PATCH 2/7] Changed the ordering of the imports to classify scala and spark imports --- .../apache/spark/streaming/StreamingContextSuite.scala | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala index 71e35a1d9fff..97f862c979e3 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala @@ -21,6 +21,7 @@ import java.io.{File, NotSerializableException} import java.util.concurrent.atomic.AtomicInteger import scala.collection.mutable.Queue +import scala.collection.mutable.ArrayBuffer import org.apache.commons.io.FileUtils import org.scalatest.concurrent.Eventually._ @@ -28,17 +29,16 @@ import org.scalatest.concurrent.Timeouts import org.scalatest.exceptions.TestFailedDueToTimeoutException import org.scalatest.time.SpanSugar._ import org.scalatest.{Assertions, BeforeAndAfter} +import org.scalatest.{PrivateMethodTester, Assertions, BeforeAndAfter} +import org.apache.spark.metrics.MetricsSystem +import org.apache.spark.metrics.source.Source import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.dstream.DStream import org.apache.spark.streaming.receiver.Receiver import org.apache.spark.util.Utils import org.apache.spark.{Logging, SparkConf, SparkContext, SparkFunSuite} -import org.apache.spark.metrics.MetricsSystem -import org.apache.spark.metrics.source.Source -import org.scalatest.{PrivateMethodTester, Assertions, BeforeAndAfter} -import org.apache.spark.{Logging, SparkConf, SparkContext, SparkFunSuite} -import scala.collection.mutable.ArrayBuffer + class StreamingContextSuite extends SparkFunSuite with BeforeAndAfter with Timeouts with Logging { From 8873180991a021281e1d37c70e7bf7fa53890c18 Mon Sep 17 00:00:00 2001 From: Neelesh Srinivas Salian Date: Mon, 13 Jul 2015 05:20:39 -0700 Subject: [PATCH 3/7] Removed redundancy in imports --- .../scala/org/apache/spark/streaming/StreamingContextSuite.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala index 97f862c979e3..99ca231d983f 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala @@ -28,7 +28,6 @@ import org.scalatest.concurrent.Eventually._ import org.scalatest.concurrent.Timeouts import org.scalatest.exceptions.TestFailedDueToTimeoutException import org.scalatest.time.SpanSugar._ -import org.scalatest.{Assertions, BeforeAndAfter} import org.scalatest.{PrivateMethodTester, Assertions, BeforeAndAfter} import org.apache.spark.metrics.MetricsSystem From daedaa50cb761e3d9e90213be262d0ef4e891fb4 Mon Sep 17 00:00:00 2001 From: Neelesh Srinivas Salian Date: Mon, 13 Jul 2015 05:54:29 -0700 Subject: [PATCH 4/7] Corrected Ordering of imports --- .../org/apache/spark/streaming/StreamingContextSuite.scala | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala index 99ca231d983f..3cedb4f1db8a 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala @@ -20,15 +20,15 @@ package org.apache.spark.streaming import java.io.{File, NotSerializableException} import java.util.concurrent.atomic.AtomicInteger -import scala.collection.mutable.Queue import scala.collection.mutable.ArrayBuffer +import scala.collection.mutable.Queue import org.apache.commons.io.FileUtils import org.scalatest.concurrent.Eventually._ import org.scalatest.concurrent.Timeouts import org.scalatest.exceptions.TestFailedDueToTimeoutException import org.scalatest.time.SpanSugar._ -import org.scalatest.{PrivateMethodTester, Assertions, BeforeAndAfter} +import org.scalatest.{Assertions, BeforeAndAfter, PrivateMethodTester} import org.apache.spark.metrics.MetricsSystem import org.apache.spark.metrics.source.Source @@ -839,7 +839,6 @@ package object testPackage extends Assertions { * Helper methods for testing StreamingContextSuite * This includes methods to access private methods and fields in StreamingContext and MetricsSystem */ - private object StreamingContextSuite extends PrivateMethodTester { private val _sources = PrivateMethod[ArrayBuffer[Source]]('sources) private def getSources(metricsSystem: MetricsSystem): ArrayBuffer[Source] = { From 0e8007a1387b51d2ccd117f00022cff550c76857 Mon Sep 17 00:00:00 2001 From: Neelesh Srinivas Salian Date: Mon, 13 Jul 2015 06:44:50 -0700 Subject: [PATCH 5/7] moved import org.apache.spark{} to correct place --- .../org/apache/spark/streaming/StreamingContextSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala index 3cedb4f1db8a..c5a4c461ba42 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala @@ -30,13 +30,13 @@ import org.scalatest.exceptions.TestFailedDueToTimeoutException import org.scalatest.time.SpanSugar._ import org.scalatest.{Assertions, BeforeAndAfter, PrivateMethodTester} +import org.apache.spark.{Logging, SparkConf, SparkContext, SparkFunSuite} import org.apache.spark.metrics.MetricsSystem import org.apache.spark.metrics.source.Source import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.dstream.DStream import org.apache.spark.streaming.receiver.Receiver import org.apache.spark.util.Utils -import org.apache.spark.{Logging, SparkConf, SparkContext, SparkFunSuite} class StreamingContextSuite extends SparkFunSuite with BeforeAndAfter with Timeouts with Logging { From 8b26397a401035c1925712b69b6b6cabd26bb12a Mon Sep 17 00:00:00 2001 From: Neelesh Srinivas Salian Date: Mon, 13 Jul 2015 06:50:01 -0700 Subject: [PATCH 6/7] Moved the scalatest.{} import --- .../org/apache/spark/streaming/StreamingContextSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala index c5a4c461ba42..954791b38f5f 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala @@ -24,11 +24,11 @@ import scala.collection.mutable.ArrayBuffer import scala.collection.mutable.Queue import org.apache.commons.io.FileUtils +import org.scalatest.{Assertions, BeforeAndAfter, PrivateMethodTester} import org.scalatest.concurrent.Eventually._ import org.scalatest.concurrent.Timeouts import org.scalatest.exceptions.TestFailedDueToTimeoutException import org.scalatest.time.SpanSugar._ -import org.scalatest.{Assertions, BeforeAndAfter, PrivateMethodTester} import org.apache.spark.{Logging, SparkConf, SparkContext, SparkFunSuite} import org.apache.spark.metrics.MetricsSystem From 7d998a35370a78a0cc9346c1ce9941561da6d2a8 Mon Sep 17 00:00:00 2001 From: Neelesh Srinivas Salian Date: Mon, 13 Jul 2015 14:45:45 -0700 Subject: [PATCH 7/7] Removed the Thread.sleep() call --- .../scala/org/apache/spark/streaming/StreamingContextSuite.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala index 954791b38f5f..289a159d8990 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala @@ -313,7 +313,6 @@ class StreamingContextSuite extends SparkFunSuite with BeforeAndAfter with Timeo val streamingSource = StreamingContextSuite.getStreamingSource(ssc) assert(sources.contains(streamingSource)) assert(ssc.getState() === StreamingContextState.ACTIVE) - Thread.sleep(100) ssc.stop() val sourcesAfterStop = StreamingContextSuite.getSources(ssc.env.metricsSystem)