Skip to content

Commit 66e83d7

Browse files
Ngone51kai-chi
authored andcommitted
[SPARK-26392][YARN] Cancel pending allocate requests by taking locality preference into account
## What changes were proposed in this pull request? Right now, we cancel pending allocate requests by its sending order. I thing we can take locality preference into account when do this to perfom least impact on task locality preference. ## How was this patch tested? N.A. Closes apache#23344 from Ngone51/dev-cancel-pending-allocate-requests-by-taking-locality-preference-into-account. Authored-by: Ngone51 <[email protected]> Signed-off-by: Marcelo Vanzin <[email protected]> (cherry picked from commit 3d6b44d) Signed-off-by: Marcelo Vanzin <[email protected]>
1 parent d4de2e7 commit 66e83d7

File tree

1 file changed

+12
-17
lines changed

1 file changed

+12
-17
lines changed

resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala

Lines changed: 12 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -305,21 +305,21 @@ private[yarn] class YarnAllocator(
305305
s"pending: $numPendingAllocate, running: ${runningExecutors.size}, " +
306306
s"executorsStarting: ${numExecutorsStarting.get}")
307307

308+
// Split the pending container request into three groups: locality matched list, locality
309+
// unmatched list and non-locality list. Take the locality matched container request into
310+
// consideration of container placement, treat as allocated containers.
311+
// For locality unmatched and locality free container requests, cancel these container
312+
// requests, since required locality preference has been changed, recalculating using
313+
// container placement strategy.
314+
val (localRequests, staleRequests, anyHostRequests) = splitPendingAllocationsByLocality(
315+
hostToLocalTaskCounts, pendingAllocate)
316+
308317
if (missing > 0) {
309318
logInfo(s"Will request $missing executor container(s), each with " +
310319
s"${resource.getVirtualCores} core(s) and " +
311320
s"${resource.getMemory} MB memory (including $memoryOverhead MB of overhead) and " +
312321
s"${resource.getGPUs} GPU(s)")
313322

314-
// Split the pending container request into three groups: locality matched list, locality
315-
// unmatched list and non-locality list. Take the locality matched container request into
316-
// consideration of container placement, treat as allocated containers.
317-
// For locality unmatched and locality free container requests, cancel these container
318-
// requests, since required locality preference has been changed, recalculating using
319-
// container placement strategy.
320-
val (localRequests, staleRequests, anyHostRequests) = splitPendingAllocationsByLocality(
321-
hostToLocalTaskCounts, pendingAllocate)
322-
323323
// cancel "stale" requests for locations that are no longer needed
324324
staleRequests.foreach { stale =>
325325
amClient.removeContainerRequest(stale)
@@ -379,14 +379,9 @@ private[yarn] class YarnAllocator(
379379
val numToCancel = math.min(numPendingAllocate, -missing)
380380
logInfo(s"Canceling requests for $numToCancel executor container(s) to have a new desired " +
381381
s"total $targetNumExecutors executors.")
382-
383-
val matchingRequests = amClient.getMatchingRequests(RM_REQUEST_PRIORITY, ANY_HOST, resource)
384-
if (!matchingRequests.isEmpty) {
385-
matchingRequests.iterator().next().asScala
386-
.take(numToCancel).foreach(amClient.removeContainerRequest)
387-
} else {
388-
logWarning("Expected to find pending requests, but found none.")
389-
}
382+
// cancel pending allocate requests by taking locality preference into account
383+
val cancelRequests = (staleRequests ++ anyHostRequests ++ localRequests).take(numToCancel)
384+
cancelRequests.foreach(amClient.removeContainerRequest)
390385
}
391386
}
392387

0 commit comments

Comments
 (0)