Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
[SPARK-23637][YARN]Yarn might allocate more resource if a same execut…
…or is killed multiple times.
  • Loading branch information
jinxing committed Mar 9, 2018
commit bd6f8a1dd608d95f6faf2758cbd36e6328f48eb3
Original file line number Diff line number Diff line change
Expand Up @@ -242,12 +242,11 @@ private[yarn] class YarnAllocator(
* Request that the ResourceManager release the container running the specified executor.
*/
def killExecutor(executorId: String): Unit = synchronized {
if (executorIdToContainer.contains(executorId)) {
val container = executorIdToContainer.get(executorId).get
internalReleaseContainer(container)
numExecutorsRunning.decrementAndGet()
} else {
logWarning(s"Attempted to kill unknown executor $executorId!")
executorIdToContainer.get(executorId) match {
case Some(container) if !releasedContainers.contains(container.getId) =>
internalReleaseContainer(container)
numExecutorsRunning.decrementAndGet()
case _ => logWarning(s"Attempted to kill unknown executor $executorId!")
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -256,6 +256,30 @@ class YarnAllocatorSuite extends SparkFunSuite with Matchers with BeforeAndAfter
handler.getPendingAllocate.size should be (1)
}

test("kill same executor multiple times") {
val handler = createAllocator(2)
handler.updateResourceRequests()
handler.getNumExecutorsRunning should be (0)
handler.getPendingAllocate.size should be (2)

val container1 = createContainer("host1")
val container2 = createContainer("host2")
handler.handleAllocatedContainers(Array(container1, container2))
handler.getNumExecutorsRunning should be (2)
handler.getPendingAllocate.size should be (0)

val executorToKill = handler.executorIdToContainer.keys.head
handler.killExecutor(executorToKill)
handler.getNumExecutorsRunning should be (1)
handler.killExecutor(executorToKill)
handler.killExecutor(executorToKill)
handler.killExecutor(executorToKill)
handler.getNumExecutorsRunning should be (1)
handler.requestTotalExecutorsWithPreferredLocalities(2, 0, Map.empty, Set.empty)
handler.updateResourceRequests()
handler.getPendingAllocate.size should be (1)
}

test("lost executor removed from backend") {
val handler = createAllocator(4)
handler.updateResourceRequests()
Expand Down