Skip to content
Closed
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Add test
  • Loading branch information
tedyu committed Nov 18, 2015
commit d67a133017467a9ef17fa4ed331b6c2731b5482b
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import scala.collection.mutable.{ArrayBuffer, HashMap, SynchronizedBuffer, Synch
import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.global

import org.apache.spark.SparkException
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.receiver.Receiver
Expand Down Expand Up @@ -171,6 +172,14 @@ class StreamingListenerSuite extends TestSuiteBase with Matchers {
"A successful batch should not set errorMessage")
}

test("don't call ssc.stop in listener") {
ssc = new StreamingContext("local[2]", "ssc", Milliseconds(1000))
val inputStream = ssc.receiverStream(new StreamingListenerSuiteReceiver)
inputStream.foreachRDD(_.count)

val failureReasons = startStreamingContextAndCallStop(ssc)
}

test("onBatchCompleted with failed batch and one failed job") {
ssc = new StreamingContext("local[2]", "test", Milliseconds(1000))
val inputStream = ssc.receiverStream(new StreamingListenerSuiteReceiver)
Expand Down Expand Up @@ -207,6 +216,19 @@ class StreamingListenerSuite extends TestSuiteBase with Matchers {
assert(failureReasons(1).contains("This is another failed job"))
}

private def startStreamingContextAndCallStop(_ssc: StreamingContext): Unit = {
val contextStoppingCollector = new StreamingContextStoppingCollector(_ssc)
_ssc.addStreamingListener(contextStoppingCollector)
val batchCounter = new BatchCounter(_ssc)
_ssc.start()
// Make sure running at least one batch
batchCounter.waitUntilBatchesCompleted(expectedNumCompletedBatches = 1, timeout = 10000)
intercept[SparkException] {
_ssc.awaitTerminationOrTimeout(10000)
}
_ssc.stop()
}

private def startStreamingContextAndCollectFailureReasons(
_ssc: StreamingContext, isFailed: Boolean = false): Map[Int, String] = {
val failureReasonsCollector = new FailureReasonsCollector()
Expand Down Expand Up @@ -320,3 +342,12 @@ class FailureReasonsCollector extends StreamingListener {
}
}
}
/**
* A StreamingListener that calls StreamingContext.stop().
*/
class StreamingContextStoppingCollector(val ssc: StreamingContext) extends StreamingListener {
override def onOutputOperationStarted(
outputOperationStarted: StreamingListenerOutputOperationStarted): Unit = {
ssc.stop()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the listener bus will just log the exception. You can catch the exception here and use a field to store it. Then you can assert the exception in the test.

}
}