-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-11761] Prevent the call to StreamingContext#stop() in the listener bus's thread #9741
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 5 commits
bc40285
8f583b9
abab461
d67a133
e0c6163
8ff4c61
4ef7de2
922bf0b
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -21,6 +21,7 @@ import scala.collection.mutable.{ArrayBuffer, HashMap, SynchronizedBuffer, Synch | |
| import scala.concurrent.Future | ||
| import scala.concurrent.ExecutionContext.Implicits.global | ||
|
|
||
| import org.apache.spark.SparkException | ||
| import org.apache.spark.storage.StorageLevel | ||
| import org.apache.spark.streaming.dstream.DStream | ||
| import org.apache.spark.streaming.receiver.Receiver | ||
|
|
@@ -161,6 +162,14 @@ class StreamingListenerSuite extends TestSuiteBase with Matchers { | |
| } | ||
| } | ||
|
|
||
| test("don't call ssc.stop in listener") { | ||
| ssc = new StreamingContext("local[2]", "ssc", Milliseconds(1000)) | ||
| val inputStream = ssc.receiverStream(new StreamingListenerSuiteReceiver) | ||
| inputStream.foreachRDD(_.count) | ||
|
|
||
| val failureReasons = startStreamingContextAndCallStop(ssc) | ||
| } | ||
|
|
||
| test("onBatchCompleted with successful batch") { | ||
| ssc = new StreamingContext("local[2]", "test", Milliseconds(1000)) | ||
| val inputStream = ssc.receiverStream(new StreamingListenerSuiteReceiver) | ||
|
|
@@ -207,6 +216,17 @@ class StreamingListenerSuite extends TestSuiteBase with Matchers { | |
| assert(failureReasons(1).contains("This is another failed job")) | ||
| } | ||
|
|
||
| private def startStreamingContextAndCallStop(_ssc: StreamingContext): Unit = { | ||
| val contextStoppingCollector = new StreamingContextStoppingCollector(_ssc) | ||
| _ssc.addStreamingListener(contextStoppingCollector) | ||
| val batchCounter = new BatchCounter(_ssc) | ||
| _ssc.start() | ||
| // Make sure running at least one batch | ||
| batchCounter.waitUntilBatchesCompleted(expectedNumCompletedBatches = 1, timeout = 10000) | ||
| _ssc.stop() | ||
| assert(contextStoppingCollector.sparkExSeen) | ||
| } | ||
|
|
||
| private def startStreamingContextAndCollectFailureReasons( | ||
| _ssc: StreamingContext, isFailed: Boolean = false): Map[Int, String] = { | ||
| val failureReasonsCollector = new FailureReasonsCollector() | ||
|
|
@@ -320,3 +340,17 @@ class FailureReasonsCollector extends StreamingListener { | |
| } | ||
| } | ||
| } | ||
| /** | ||
| * A StreamingListener that calls StreamingContext.stop(). | ||
| */ | ||
| class StreamingContextStoppingCollector(val ssc: StreamingContext) extends StreamingListener { | ||
| var sparkExSeen = false | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
| override def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted) { | ||
| try { | ||
| ssc.stop() | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. the listener bus will just log the exception. You can catch the exception here and use a field to store it. Then you can assert the exception in the test. |
||
| } catch { | ||
| case se: SparkException => | ||
| sparkExSeen = true | ||
| } | ||
| } | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: remove
val failureReasons =