Skip to content

Commit c43b8ae

Browse files
committed
Added graceful shutdown to Spark Streaming.
1 parent 345825d commit c43b8ae

File tree

14 files changed

+522
-196
lines changed

14 files changed

+522
-196
lines changed

streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -194,19 +194,19 @@ class CheckpointWriter(
194194
}
195195
}
196196

197-
def stop() {
198-
synchronized {
199-
if (stopped) {
200-
return
201-
}
202-
stopped = true
203-
}
197+
def stop(): Unit = synchronized {
198+
if (stopped) return
199+
204200
executor.shutdown()
205201
val startTime = System.currentTimeMillis()
206202
val terminated = executor.awaitTermination(10, java.util.concurrent.TimeUnit.SECONDS)
203+
if (!terminated) {
204+
executor.shutdownNow()
205+
}
207206
val endTime = System.currentTimeMillis()
208207
logInfo("CheckpointWriter executor terminated ? " + terminated +
209208
", waited for " + (endTime - startTime) + " ms.")
209+
stopped = true
210210
}
211211

212212
private def fs = synchronized {

streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala

Lines changed: 36 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -158,6 +158,15 @@ class StreamingContext private[streaming] (
158158

159159
private[streaming] val waiter = new ContextWaiter
160160

161+
/** Enumeration to identify current state of the StreamingContext */
162+
private[streaming] object ContextState extends Enumeration {
163+
type CheckpointState = Value
164+
val Initialized, Started, Stopped = Value
165+
}
166+
167+
import ContextState._
168+
private[streaming] var state = Initialized
169+
161170
/**
162171
* Return the associated Spark context
163172
*/
@@ -405,9 +414,18 @@ class StreamingContext private[streaming] (
405414
/**
406415
* Start the execution of the streams.
407416
*/
408-
def start() = synchronized {
417+
def start(): Unit = synchronized {
418+
// Throw exception if the context has already been started once
419+
// or if a stopped context is being started again
420+
if (state == Started) {
421+
throw new SparkException("StreamingContext has already been started")
422+
}
423+
if (state == Stopped) {
424+
throw new SparkException("StreamingContext has already been stopped")
425+
}
409426
validate()
410427
scheduler.start()
428+
state = Started
411429
}
412430

413431
/**
@@ -430,12 +448,27 @@ class StreamingContext private[streaming] (
430448
/**
431449
* Stop the execution of the streams.
432450
* @param stopSparkContext Stop the associated SparkContext or not
451+
* @param stopGracefully Stop gracefully by waiting for the processing of all
452+
* received data to be completed
433453
*/
434-
def stop(stopSparkContext: Boolean = true) = synchronized {
435-
scheduler.stop()
454+
def stop(
455+
stopSparkContext: Boolean = true,
456+
stopGracefully: Boolean = false
457+
): Unit = synchronized {
458+
// Silently warn if context is stopped twice, or context is stopped before starting
459+
if (state == Initialized) {
460+
logWarning("StreamingContext has not been started yet")
461+
return
462+
}
463+
if (state == Stopped) {
464+
logWarning("StreamingContext has already been stopped")
465+
return
466+
} // no need to throw an exception as its okay to stop twice
467+
scheduler.stop(stopGracefully)
436468
logInfo("StreamingContext stopped successfully")
437469
waiter.notifyStop()
438470
if (stopSparkContext) sc.stop()
471+
state = Stopped
439472
}
440473
}
441474

streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -502,6 +502,16 @@ class JavaStreamingContext(val ssc: StreamingContext) {
502502
* @param stopSparkContext Stop the associated SparkContext or not
503503
*/
504504
def stop(stopSparkContext: Boolean) = ssc.stop(stopSparkContext)
505+
506+
/**
507+
* Stop the execution of the streams.
508+
* @param stopSparkContext Stop the associated SparkContext or not
509+
* @param stopGracefully Stop gracefully by waiting for the processing of all
510+
* received data to be completed
511+
*/
512+
def stop(stopSparkContext: Boolean, stopGracefully: Boolean) = {
513+
ssc.stop(stopSparkContext, stopGracefully)
514+
}
505515
}
506516

507517
/**

0 commit comments

Comments
 (0)