Skip to content
This repository was archived by the owner on Oct 23, 2024. It is now read-only.
Merged
Prev Previous commit
Next Next commit
Track retriedDrivers via HashSet
This frees us from parsing retry counts from strings and future proofs
us in the event taskId UUID ever changes
  • Loading branch information
samvantran committed Jan 17, 2019
commit 58f3f746ee15db8792323f90b9e65e17b9852bb0
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,7 @@ private[spark] class MesosClusterScheduler(
private val pendingRecover = new mutable.HashMap[String, SlaveID]()
// Stores all the submitted drivers that hasn't been launched, keyed by submission id
private val queuedDrivers = new ArrayBuffer[MesosDriverDescription]()
private val retriedDrivers = new mutable.HashSet[String]()
// All supervised drivers that are waiting to retry after termination, keyed by submission id
private val pendingRetryDrivers = new ArrayBuffer[MesosDriverDescription]()
private val queuedDriversState = engineFactory.createEngine("driverQueue")
Expand Down Expand Up @@ -381,11 +382,6 @@ private[spark] class MesosClusterScheduler(
desc.retryState.map(state => sId + s"${RETRY_SEP}${state.retries.toString}").getOrElse(sId)
}

private def getRetryCountFromTaskId(taskId: String): Int = {
val parts = taskId.split(s"${RETRY_SEP}")
if (parts.length > 1) parts.last.toInt else 0
}

private def getSubmissionIdFromTaskId(taskId: String): String = {
taskId.split(s"${RETRY_SEP}").head
}
Expand Down Expand Up @@ -770,8 +766,8 @@ private[spark] class MesosClusterScheduler(
val state = launchedDrivers(subId)
// Check if the driver is supervise enabled and can be relaunched.
if (state.driverDescription.supervise && shouldRelaunch(status.getState)) {
if (taskIsOutdated(taskId, state)) {
// Early return to avoid outdated task from overwriting a more recent status
if (taskIsOutdated(taskId)) {
// Prevent outdated task from overwriting a more recent status
return
}
removeFromLaunchedDrivers(subId)
Expand All @@ -796,10 +792,11 @@ private[spark] class MesosClusterScheduler(
}
}

private def taskIsOutdated(taskId: String, state: MesosClusterSubmissionState): Boolean = {
if (getRetryCountFromTaskId(taskId) < getRetryCountFromTaskId(state.frameworkId)) {
private def taskIsOutdated(taskId: String): Boolean = {
if (retriedDrivers.contains(taskId)) {
true
} else {
retriedDrivers.add(taskId)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

having state mutation logic in a boolean function which checks some condition looks a bit confusing

false
}
}
Expand Down