Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
73549ff
Add func setupStreamingListeners to add listeners from
Jeffrharr May 21, 2015
4eb6987
Merge branch 'master' into SparkConf_Listeners
Jeffrharr May 23, 2015
f8ad629
Use class API to add listener. Modify StatsReportListener to have a zero
Jeffrharr May 23, 2015
3c1a19d
Reduced ambiguity by removing default values from StatsReportListener…
Jeffrharr May 23, 2015
f3a3fee
Add test
Jeffrharr May 23, 2015
e8506d4
Fix minor syntax derp
Jeffrharr May 23, 2015
6453c90
Fix style issues and small type in StreamingListenerSuite
Jeffrharr May 24, 2015
d92d55b
Maintain consistancy with property naming conventions
Jeffrharr May 27, 2015
00c0409
Fix test for new format
Jeffrharr May 27, 2015
186766f
Merge remote-tracking branch 'upstream/master' into SparkConf_Listeners
Jeffrharr May 28, 2015
c94982f
Test renamed to spark.streaming.listeners
Jeffrharr May 28, 2015
40d91ed
Merge remote-tracking branch 'upstream/master' into SparkConf_Listeners
Jeffrharr Jul 10, 2015
d7e7b2e
Update name to spark.streaming.extraListeners / fix docs / change
Jeffrharr Jul 10, 2015
33d3179
Default to 100
Jeffrharr Jul 10, 2015
233335d
Update Class.forName to org.apache.spark.util.Utils.classForName to
Jeffrharr Jul 10, 2015
28e7f27
Use spark.streaming.extraListeners
Jeffrharr Jul 10, 2015
af41098
checking
Jeffrharr Jul 10, 2015
9ec68c1
Changes in clarity, removed redundant error handling
Jeffrharr Jul 13, 2015
70c31e4
Fix documentation, add more/clearer test cases
Jeffrharr Jul 16, 2015
aff08ff
Accidental insertion removed
Jeffrharr Jul 16, 2015
fa8c752
Correct test text
Jeffrharr Jul 16, 2015
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Correct test text
  • Loading branch information
Jeffrharr committed Jul 16, 2015
commit fa8c752d6a78d1c54717819f37ffe2a0f0ec48a5
Original file line number Diff line number Diff line change
Expand Up @@ -566,8 +566,8 @@ class StreamingContext private[streaming] (
zeroArgumentConstructor.get.newInstance()
} else {
throw new SparkException(
s"Exception when registering Streaming Listener:" +
" $className did not have a zero-argument constructor or a" +
"Exception when registering Streaming Listener:" +
s" $className did not have a zero-argument constructor or a" +
" single-argument constructor that accepts SparkConf. Note: if the class is" +
" defined inside of another Scala class, then its constructors may accept an" +
" implicit parameter that references the enclosing class; in this case, you must" +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,12 +145,15 @@ class StreamingListenerSuite extends TestSuiteBase with Matchers {
test("registering listeners via spark.streaming.extraListeners") {
// Test for success with zero-argument constructor and sparkConf constructor
val conf = new SparkConf().setMaster("local").setAppName("test")
.set("spark.streaming.extraListeners", classOf[StreamingListenerThatAcceptsSparkConf].getName + "," +
.set("spark.streaming.extraListeners",
classOf[StreamingListenerThatAcceptsSparkConf].getName + "," +
classOf[ReceiverInfoCollector].getName)
val scc = new StreamingContext(conf, Seconds(1))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can lead to a leak in the StreamingContext and underlying SparkContext. Please put the whole in

var ssc: StremaingContext = null
try {
  ssc = new StreamingContext...

} finally {
   if (ssc != null) {
     ssc.stop()
   }
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

StreamingListenerSuite extends TestSuiteBase, so I think we can use the withStreamingContext(ssc) { } construct to manage cleanup for us: https://github.com/apache/spark/blob/master/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala#L270

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this still the case when the context is never started?

On Fri, Jul 17, 2015 at 1:55 AM, Tathagata Das [email protected]
wrote:

In
streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala
#6380 (comment):

@@ -140,6 +141,37 @@ class StreamingListenerSuite extends TestSuiteBase with Matchers {
}
true
}
+

  • test("registering listeners via spark.streaming.extraListeners") {
  • // Test for success with zero-argument constructor and sparkConf constructor
  • val conf = new SparkConf().setMaster("local").setAppName("test")
  •  .set("spark.streaming.extraListeners",
    
  •    classOf[StreamingListenerThatAcceptsSparkConf].getName + "," +
    
  •    classOf[ReceiverInfoCollector].getName)
    
  • val scc = new StreamingContext(conf, Seconds(1))

This can lead to a leak in the StreamingContext and underlying
SparkContext. Please put the whole in

var ssc: StremaingContext = null
try {
ssc = new StreamingContext...

} finally {
if (ssc != null) {
ssc.stop()
}
}


Reply to this email directly or view it on GitHub
https://github.com/apache/spark/pull/6380/files#r34868938.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, the StreamingContext is being created, which means an underlying SparkContext is being created, which is not being cleaned up.


scc.scheduler.listenerBus.listeners.exists { _.isInstanceOf[StreamingListenerThatAcceptsSparkConf] }
scc.scheduler.listenerBus.listeners.exists { _.isInstanceOf[ReceiverInfoCollector] }
scc.scheduler.listenerBus.listeners.exists {
_.isInstanceOf[StreamingListenerThatAcceptsSparkConf] }
scc.scheduler.listenerBus.listeners.exists {
_.isInstanceOf[ReceiverInfoCollector] }

// Test for failure with too many arguments in constructor
val failingConf = new SparkConf().setMaster("local").setAppName("failingTest")
Expand All @@ -159,8 +162,9 @@ class StreamingListenerSuite extends TestSuiteBase with Matchers {
val failingScc = new StreamingContext(failingConf, Seconds(1))
}
val expectedErrorMessage =
s"Exception when registering Streaming Listener:" +
" StreamingListenerTooManyArguments did not have a zero-argument constructor or a" +
"Exception when registering Streaming Listener:" +
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lets not be so specific, the exact text can change. Just verify whether the message has the name of the class inside it.

" org.apache.spark.streaming.StreamingListenerTooManyArguments" +
" did not have a zero-argument constructor or a" +
" single-argument constructor that accepts SparkConf. Note: if the class is" +
" defined inside of another Scala class, then its constructors may accept an" +
" implicit parameter that references the enclosing class; in this case, you must" +
Expand Down