Skip to content

Commit c1a6126

Browse files
committed
Fix a race condition that startReceiver may happen before setting trackerState to Started
1 parent dd0614f commit c1a6126

File tree

1 file changed

+3
-3
lines changed

1 file changed

+3
-3
lines changed

streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -469,7 +469,7 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false
469469
*/
470470
private def startReceiver(receiver: Receiver[_], scheduledExecutors: Seq[String]): Unit = {
471471
val receiverId = receiver.streamId
472-
if (!isTrackerStarted) {
472+
if (isTrackerStopping || isTrackerStopped) {
473473
onReceiverJobFinish(receiverId)
474474
return
475475
}
@@ -494,14 +494,14 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false
494494
// We will keep restarting the receiver job until ReceiverTracker is stopped
495495
future.onComplete {
496496
case Success(_) =>
497-
if (!isTrackerStarted) {
497+
if (isTrackerStopping || isTrackerStopped) {
498498
onReceiverJobFinish(receiverId)
499499
} else {
500500
logInfo(s"Restarting Receiver $receiverId")
501501
self.send(RestartReceiver(receiver))
502502
}
503503
case Failure(e) =>
504-
if (!isTrackerStarted) {
504+
if (isTrackerStopping || isTrackerStopped) {
505505
onReceiverJobFinish(receiverId)
506506
} else {
507507
logError("Receiver has been stopped. Try to restart it.", e)

0 commit comments

Comments
 (0)