-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-8743] [Streaming]: Deregister Codahale metrics for streaming when StreamingContext is closed #7362
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-8743] [Streaming]: Deregister Codahale metrics for streaming when StreamingContext is closed #7362
Changes from 1 commit
d8cb577
59227a4
8873180
daedaa5
0e8007a
8b26397
7d998a3
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
…est to check the registration and de-registration
- Loading branch information
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -33,8 +33,12 @@ import org.apache.spark.storage.StorageLevel | |
| import org.apache.spark.streaming.dstream.DStream | ||
| import org.apache.spark.streaming.receiver.Receiver | ||
| import org.apache.spark.util.Utils | ||
| import org.apache.spark.{Logging, SparkConf, SparkContext, SparkException, SparkFunSuite} | ||
|
|
||
| import org.apache.spark.{Logging, SparkConf, SparkContext, SparkFunSuite} | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Again, your imports are not organized properly. Please pay careful attention to this. spark imports go together and are sorted alphabetically. scala imports should go together too.
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Made the change in the next commit. |
||
| import org.apache.spark.metrics.MetricsSystem | ||
| import org.apache.spark.metrics.source.Source | ||
| import org.scalatest.{PrivateMethodTester, Assertions, BeforeAndAfter} | ||
| import org.apache.spark.{Logging, SparkConf, SparkContext, SparkFunSuite} | ||
| import scala.collection.mutable.ArrayBuffer | ||
|
|
||
| class StreamingContextSuite extends SparkFunSuite with BeforeAndAfter with Timeouts with Logging { | ||
|
|
||
|
|
@@ -299,6 +303,26 @@ class StreamingContextSuite extends SparkFunSuite with BeforeAndAfter with Timeo | |
| Thread.sleep(100) | ||
| } | ||
|
|
||
| test ("registering and de-registering of streamingSource") { | ||
| val conf = new SparkConf().setMaster(master).setAppName(appName) | ||
| ssc = new StreamingContext(conf, batchDuration) | ||
| assert(ssc.getState() === StreamingContextState.INITIALIZED) | ||
| addInputStream(ssc).register() | ||
| ssc.start() | ||
|
|
||
| val sources = StreamingContextSuite.getSources(ssc.env.metricsSystem) | ||
| val streamingSource = StreamingContextSuite.getStreamingSource(ssc) | ||
| assert(sources.contains(streamingSource)) | ||
| assert(ssc.getState() === StreamingContextState.ACTIVE) | ||
| Thread.sleep(100) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Any particular reason why you added this sleep?
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Removed it. Added it during my runs. Thanks. |
||
|
|
||
| ssc.stop() | ||
| val sourcesAfterStop = StreamingContextSuite.getSources(ssc.env.metricsSystem) | ||
| val streamingSourceAfterStop = StreamingContextSuite.getStreamingSource(ssc) | ||
| assert(ssc.getState() === StreamingContextState.STOPPED) | ||
| assert(!sourcesAfterStop.contains(streamingSourceAfterStop)) | ||
| } | ||
|
|
||
| test("awaitTermination") { | ||
| ssc = new StreamingContext(master, appName, batchDuration) | ||
| val inputStream = addInputStream(ssc) | ||
|
|
@@ -811,3 +835,19 @@ package object testPackage extends Assertions { | |
| } | ||
| } | ||
| } | ||
|
|
||
| /** | ||
| * Helper methods for testing StreamingContextSuite | ||
| * This includes methods to access private methods and fields in StreamingContext and MetricsSystem | ||
| */ | ||
|
|
||
| private object StreamingContextSuite extends PrivateMethodTester { | ||
| private val _sources = PrivateMethod[ArrayBuffer[Source]]('sources) | ||
| private def getSources(metricsSystem: MetricsSystem): ArrayBuffer[Source] = { | ||
| metricsSystem.invokePrivate(_sources()) | ||
| } | ||
| private val _streamingSource = PrivateMethod[StreamingSource]('streamingSource) | ||
| private def getStreamingSource(streamingContext: StreamingContext): StreamingSource = { | ||
| streamingContext.invokePrivate(_streamingSource()) | ||
| } | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not a big deal, but I don't see why this was changed
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It previously held the block of code that did the registration.
Here it simply initializes the streamingSource since the registration is moved into start()