-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-7786][STREAMING] Allow StreamingListener to be specified in SparkConf and loaded when creating StreamingContext #6380
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 18 commits
73549ff
4eb6987
f8ad629
3c1a19d
f3a3fee
e8506d4
6453c90
d92d55b
00c0409
186766f
c94982f
40d91ed
d7e7b2e
33d3179
233335d
28e7f27
af41098
9ec68c1
70c31e4
aff08ff
fa8c752
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -17,6 +17,8 @@ | |
|
|
||
| package org.apache.spark.streaming.scheduler | ||
|
|
||
| import org.apache.spark.scheduler.StatsReportListener | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why was this needed?
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You have not addressed this. Why was this needed?? I dont see anything that required this. |
||
|
|
||
| import scala.collection.mutable.Queue | ||
|
|
||
| import org.apache.spark.util.Distribution | ||
|
|
@@ -81,10 +83,13 @@ trait StreamingListener { | |
| /** | ||
| * :: DeveloperApi :: | ||
| * A simple StreamingListener that logs summary statistics across Spark Streaming batches | ||
| * @param numBatchInfos Number of last batches to consider for generating statistics (default: 10) | ||
| * @param numBatchInfos Number of last batches to consider for generating statistics (default: 100) | ||
| */ | ||
| @DeveloperApi | ||
| class StatsReportListener(numBatchInfos: Int = 10) extends StreamingListener { | ||
| class StatsReportListener(numBatchInfos: Int) extends StreamingListener { | ||
|
|
||
| def this() = this(100) | ||
|
|
||
| // Queue containing latest completed batches | ||
| val batchInfos = new Queue[BatchInfo]() | ||
|
|
||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -18,6 +18,7 @@ | |
| package org.apache.spark.streaming | ||
|
|
||
| import scala.collection.mutable.{ArrayBuffer, SynchronizedBuffer} | ||
| import scala.collection.JavaConversions._ | ||
| import scala.concurrent.Future | ||
| import scala.concurrent.ExecutionContext.Implicits.global | ||
|
|
||
|
|
@@ -29,7 +30,7 @@ import org.apache.spark.streaming.scheduler._ | |
| import org.scalatest.Matchers | ||
| import org.scalatest.concurrent.Eventually._ | ||
| import org.scalatest.time.SpanSugar._ | ||
| import org.apache.spark.Logging | ||
| import org.apache.spark.{SparkConf, Logging} | ||
|
|
||
| class StreamingListenerSuite extends TestSuiteBase with Matchers { | ||
|
|
||
|
|
@@ -140,6 +141,16 @@ class StreamingListenerSuite extends TestSuiteBase with Matchers { | |
| } | ||
| true | ||
| } | ||
|
|
||
| test("registering listeners via spark.streaming.extraListeners") { | ||
| val conf = new SparkConf().setMaster("local").setAppName("test") | ||
| .set("spark.streaming.extraListeners", classOf[BatchInfoCollector].getName + "," + | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I missed it earlier, this unit does not cover the two cases, one without constructor and one with constructor. Also, the
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Also please include a third case where the incorrect StreamingListener with 2 args is specified, which is expected throw an exception. Test with
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The change is already in StreamingListenerSuite.scala (and the class On Mon, Jul 13, 2015 at 5:36 PM, Tathagata Das [email protected]
|
||
| classOf[ReceiverInfoCollector].getName) | ||
| val scc = new StreamingContext(conf, Seconds(1)) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This can lead to a leak in the StreamingContext and underlying SparkContext. Please put the whole in
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. StreamingListenerSuite extends TestSuiteBase, so I think we can use the
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is this still the case when the context is never started? On Fri, Jul 17, 2015 at 1:55 AM, Tathagata Das [email protected]
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, the StreamingContext is being created, which means an underlying SparkContext is being created, which is not being cleaned up. |
||
|
|
||
| scc.scheduler.listenerBus.listeners.exists { _.isInstanceOf[BatchInfoCollector] } | ||
| scc.scheduler.listenerBus.listeners.exists { _.isInstanceOf[ReceiverInfoCollector] } | ||
| } | ||
| } | ||
|
|
||
| /** Listener that collects information on processed batches */ | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is incorrect:
"when initializing SparkContext"? ==> StreamingContext
"Spark's streaming listener bus" ==> "StreamingContext's listener bus"