Skip to content
Closed
Changes from 1 commit
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
73549ff
Add func setupStreamingListeners to add listeners from
Jeffrharr May 21, 2015
4eb6987
Merge branch 'master' into SparkConf_Listeners
Jeffrharr May 23, 2015
f8ad629
Use class API to add listener. Modify StatsReportListener to have a zero
Jeffrharr May 23, 2015
3c1a19d
Reduced ambiguity by removing default values from StatsReportListener…
Jeffrharr May 23, 2015
f3a3fee
Add test
Jeffrharr May 23, 2015
e8506d4
Fix minor syntax derp
Jeffrharr May 23, 2015
6453c90
Fix style issues and small type in StreamingListenerSuite
Jeffrharr May 24, 2015
d92d55b
Maintain consistancy with property naming conventions
Jeffrharr May 27, 2015
00c0409
Fix test for new format
Jeffrharr May 27, 2015
186766f
Merge remote-tracking branch 'upstream/master' into SparkConf_Listeners
Jeffrharr May 28, 2015
c94982f
Test renamed to spark.streaming.listeners
Jeffrharr May 28, 2015
40d91ed
Merge remote-tracking branch 'upstream/master' into SparkConf_Listeners
Jeffrharr Jul 10, 2015
d7e7b2e
Update name to spark.streaming.extraListeners / fix docs / change
Jeffrharr Jul 10, 2015
33d3179
Default to 100
Jeffrharr Jul 10, 2015
233335d
Update Class.forName to org.apache.spark.util.Utils.classForName to
Jeffrharr Jul 10, 2015
28e7f27
Use spark.streaming.extraListeners
Jeffrharr Jul 10, 2015
af41098
checking
Jeffrharr Jul 10, 2015
9ec68c1
Changes in clarity, removed redundant error handling
Jeffrharr Jul 13, 2015
70c31e4
Fix documentation, add more/clearer test cases
Jeffrharr Jul 16, 2015
aff08ff
Accidental insertion removed
Jeffrharr Jul 16, 2015
fa8c752
Correct test text
Jeffrharr Jul 16, 2015
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Fix style issues and small type in StreamingListenerSuite
  • Loading branch information
Jeffrharr committed May 24, 2015
commit 6453c90cffd17705c22f214b67f03eedca8baf24
Original file line number Diff line number Diff line change
Expand Up @@ -146,10 +146,10 @@ class StreamingListenerSuite extends TestSuiteBase with Matchers {
classOf[ReceiverInfoCollector].getName)
val scc = new StreamingContext(conf, Seconds(1))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can lead to a leak in the StreamingContext and underlying SparkContext. Please put the whole in

var ssc: StremaingContext = null
try {
  ssc = new StreamingContext...

} finally {
   if (ssc != null) {
     ssc.stop()
   }
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

StreamingListenerSuite extends TestSuiteBase, so I think we can use the withStreamingContext(ssc) { } construct to manage cleanup for us: https://github.com/apache/spark/blob/master/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala#L270

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this still the case when the context is never started?

On Fri, Jul 17, 2015 at 1:55 AM, Tathagata Das [email protected]
wrote:

In
streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala
#6380 (comment):

@@ -140,6 +141,37 @@ class StreamingListenerSuite extends TestSuiteBase with Matchers {
}
true
}
+

  • test("registering listeners via spark.streaming.extraListeners") {
  • // Test for success with zero-argument constructor and sparkConf constructor
  • val conf = new SparkConf().setMaster("local").setAppName("test")
  •  .set("spark.streaming.extraListeners",
    
  •    classOf[StreamingListenerThatAcceptsSparkConf].getName + "," +
    
  •    classOf[ReceiverInfoCollector].getName)
    
  • val scc = new StreamingContext(conf, Seconds(1))

This can lead to a leak in the StreamingContext and underlying
SparkContext. Please put the whole in

var ssc: StremaingContext = null
try {
ssc = new StreamingContext...

} finally {
if (ssc != null) {
ssc.stop()
}
}


Reply to this email directly or view it on GitHub
https://github.com/apache/spark/pull/6380/files#r34868938.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, the StreamingContext is being created, which means an underlying SparkContext is being created, which is not being cleaned up.


scc.scheduler.listenerBus.listeners.collect { case x: BatchInfoCollector => x}.size should be (1)
scc.scheduler.listenerBus.listeners.collect {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Its easier to understand if this check is replaced with ... listeners.exists { _.isInstanceOf[...] }

case x: ReceiverInfoCollector =>
}.size should be (1)
case x: BatchInfoCollector => x }.size should be (1)
scc.scheduler.listenerBus.listeners.collect {
case x: ReceiverInfoCollector => x }.size should be (1)
}
}

Expand Down