Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,8 @@ private[yarn] class YarnAllocator(
private val releasedContainers = Collections.newSetFromMap[ContainerId](
new ConcurrentHashMap[ContainerId, java.lang.Boolean])

private val numExecutorsRunning = new AtomicInteger(0)
private val runningExecutors = Collections.newSetFromMap[String](
new ConcurrentHashMap[String, java.lang.Boolean]())

private val numExecutorsStarting = new AtomicInteger(0)

Expand Down Expand Up @@ -166,7 +167,7 @@ private[yarn] class YarnAllocator(
clock = newClock
}

def getNumExecutorsRunning: Int = numExecutorsRunning.get()
def getNumExecutorsRunning: Int = runningExecutors.size()

def getNumExecutorsFailed: Int = synchronized {
val endTime = clock.getTimeMillis()
Expand Down Expand Up @@ -242,12 +243,11 @@ private[yarn] class YarnAllocator(
* Request that the ResourceManager release the container running the specified executor.
*/
def killExecutor(executorId: String): Unit = synchronized {
if (executorIdToContainer.contains(executorId)) {
val container = executorIdToContainer.get(executorId).get
internalReleaseContainer(container)
numExecutorsRunning.decrementAndGet()
} else {
logWarning(s"Attempted to kill unknown executor $executorId!")
executorIdToContainer.get(executorId) match {
case Some(container) if !releasedContainers.contains(container.getId) =>
internalReleaseContainer(container)
runningExecutors.remove(executorId)
case _ => logWarning(s"Attempted to kill unknown executor $executorId!")
}
}

Expand All @@ -274,7 +274,7 @@ private[yarn] class YarnAllocator(
"Launching executor count: %d. Cluster resources: %s.")
.format(
allocatedContainers.size,
numExecutorsRunning.get,
runningExecutors.size,
numExecutorsStarting.get,
allocateResponse.getAvailableResources))

Expand All @@ -286,7 +286,7 @@ private[yarn] class YarnAllocator(
logDebug("Completed %d containers".format(completedContainers.size))
processCompletedContainers(completedContainers.asScala)
logDebug("Finished processing %d completed containers. Current running executor count: %d."
.format(completedContainers.size, numExecutorsRunning.get))
.format(completedContainers.size, runningExecutors.size))
}
}

Expand All @@ -300,9 +300,9 @@ private[yarn] class YarnAllocator(
val pendingAllocate = getPendingAllocate
val numPendingAllocate = pendingAllocate.size
val missing = targetNumExecutors - numPendingAllocate -
numExecutorsStarting.get - numExecutorsRunning.get
numExecutorsStarting.get - runningExecutors.size
logDebug(s"Updating resource requests, target: $targetNumExecutors, " +
s"pending: $numPendingAllocate, running: ${numExecutorsRunning.get}, " +
s"pending: $numPendingAllocate, running: ${runningExecutors.size}, " +
s"executorsStarting: ${numExecutorsStarting.get}")

if (missing > 0) {
Expand Down Expand Up @@ -502,7 +502,7 @@ private[yarn] class YarnAllocator(
s"for executor with ID $executorId")

def updateInternalState(): Unit = synchronized {
numExecutorsRunning.incrementAndGet()
runningExecutors.add(executorId)
numExecutorsStarting.decrementAndGet()
executorIdToContainer(executorId) = container
containerIdToExecutorId(container.getId) = executorId
Expand All @@ -513,7 +513,7 @@ private[yarn] class YarnAllocator(
allocatedContainerToHostMap.put(containerId, executorHostname)
}

if (numExecutorsRunning.get < targetNumExecutors) {
if (runningExecutors.size() < targetNumExecutors) {
numExecutorsStarting.incrementAndGet()
if (launchContainers) {
launcherPool.execute(new Runnable {
Expand Down Expand Up @@ -554,7 +554,7 @@ private[yarn] class YarnAllocator(
} else {
logInfo(("Skip launching executorRunnable as running executors count: %d " +
"reached target executors count: %d.").format(
numExecutorsRunning.get, targetNumExecutors))
runningExecutors.size, targetNumExecutors))
}
}
}
Expand All @@ -569,7 +569,11 @@ private[yarn] class YarnAllocator(
val exitReason = if (!alreadyReleased) {
// Decrement the number of executors running. The next iteration of
// the ApplicationMaster's reporting thread will take care of allocating.
numExecutorsRunning.decrementAndGet()
containerIdToExecutorId.get(containerId) match {
case Some(executorId) => runningExecutors.remove(executorId)
case None => logWarning(s"Cannot find executorId for container: ${containerId.toString}")
}

logInfo("Completed container %s%s (state: %s, exit status: %s)".format(
containerId,
onHostStr,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -251,11 +251,55 @@ class YarnAllocatorSuite extends SparkFunSuite with Matchers with BeforeAndAfter
ContainerStatus.newInstance(c.getId(), ContainerState.COMPLETE, "Finished", 0)
}
handler.updateResourceRequests()
handler.processCompletedContainers(statuses.toSeq)
handler.processCompletedContainers(statuses)
handler.getNumExecutorsRunning should be (0)
handler.getPendingAllocate.size should be (1)
}

test("kill same executor multiple times") {
val handler = createAllocator(2)
handler.updateResourceRequests()
handler.getNumExecutorsRunning should be (0)
handler.getPendingAllocate.size should be (2)

val container1 = createContainer("host1")
val container2 = createContainer("host2")
handler.handleAllocatedContainers(Array(container1, container2))
handler.getNumExecutorsRunning should be (2)
handler.getPendingAllocate.size should be (0)

val executorToKill = handler.executorIdToContainer.keys.head
handler.killExecutor(executorToKill)
handler.getNumExecutorsRunning should be (1)
handler.killExecutor(executorToKill)
handler.killExecutor(executorToKill)
handler.killExecutor(executorToKill)
handler.getNumExecutorsRunning should be (1)
handler.requestTotalExecutorsWithPreferredLocalities(2, 0, Map.empty, Set.empty)
handler.updateResourceRequests()
handler.getPendingAllocate.size should be (1)
}

test("process same completed container multiple times") {
val handler = createAllocator(2)
handler.updateResourceRequests()
handler.getNumExecutorsRunning should be (0)
handler.getPendingAllocate.size should be (2)

val container1 = createContainer("host1")
val container2 = createContainer("host2")
handler.handleAllocatedContainers(Array(container1, container2))
handler.getNumExecutorsRunning should be (2)
handler.getPendingAllocate.size should be (0)

val statuses = Seq(container1, container1, container2).map { c =>
ContainerStatus.newInstance(c.getId(), ContainerState.COMPLETE, "Finished", 0)
}
handler.processCompletedContainers(statuses)
handler.getNumExecutorsRunning should be (0)

}

test("lost executor removed from backend") {
val handler = createAllocator(4)
handler.updateResourceRequests()
Expand All @@ -272,7 +316,7 @@ class YarnAllocatorSuite extends SparkFunSuite with Matchers with BeforeAndAfter
ContainerStatus.newInstance(c.getId(), ContainerState.COMPLETE, "Failed", -1)
}
handler.updateResourceRequests()
handler.processCompletedContainers(statuses.toSeq)
handler.processCompletedContainers(statuses)
handler.updateResourceRequests()
handler.getNumExecutorsRunning should be (0)
handler.getPendingAllocate.size should be (2)
Expand Down