Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import java.util.concurrent.atomic.{AtomicInteger, AtomicReference}
import scala.collection.Map
import scala.collection.mutable.Queue
import scala.reflect.ClassTag
import scala.util.control.NonFatal

import akka.actor.{Props, SupervisorStrategy}
import org.apache.hadoop.conf.Configuration
Expand Down Expand Up @@ -576,18 +577,26 @@ class StreamingContext private[streaming] (
def start(): Unit = synchronized {
state match {
case INITIALIZED =>
validate()
startSite.set(DStream.getCreationSite())
sparkContext.setCallSite(startSite.get)
StreamingContext.ACTIVATION_LOCK.synchronized {
StreamingContext.assertNoOtherContextIsActive()
scheduler.start()
uiTab.foreach(_.attach())
state = StreamingContextState.ACTIVE
try {
validate()
scheduler.start()
state = StreamingContextState.ACTIVE
} catch {
case NonFatal(e) =>
logError("Error starting the context, marking it as stopped", e)
scheduler.stop(false)
state = StreamingContextState.STOPPED
throw e
}
StreamingContext.setActiveContext(this)
}
shutdownHookRef = Utils.addShutdownHook(
StreamingContext.SHUTDOWN_HOOK_PRIORITY)(stopOnShutdown)
uiTab.foreach(_.attach())
logInfo("StreamingContext started")
case ACTIVE =>
logWarning("StreamingContext has already been started")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,10 @@ class JobScheduler(val ssc: StreamingContext) extends Logging {
eventLoop.post(ErrorReported(msg, e))
}

def isStarted(): Boolean = synchronized {
eventLoop != null
}

private def processEvent(event: JobSchedulerEvent) {
try {
event match {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,22 @@ class StreamingContextSuite extends SparkFunSuite with BeforeAndAfter with Timeo
assert(StreamingContext.getActive().isEmpty)
}

test("start failure should stop internal components") {
ssc = new StreamingContext(conf, batchDuration)
val inputStream = addInputStream(ssc)
val updateFunc = (values: Seq[Int], state: Option[Int]) => {
Some(values.sum + state.getOrElse(0))
}
inputStream.map(x => (x, 1)).updateStateByKey[Int](updateFunc)
// Require that the start fails because checkpoint directory was not set
intercept[Exception] {
ssc.start()
}
assert(ssc.getState() === StreamingContextState.STOPPED)
assert(ssc.scheduler.isStarted === false)
}


test("start multiple times") {
ssc = new StreamingContext(master, appName, batchDuration)
addInputStream(ssc).register()
Expand Down