Skip to content

Commit daeb081

Browse files
Ngone51Marcelo Vanzin
authored andcommitted
[SPARK-26392][YARN] Cancel pending allocate requests by taking locality preference into account
## What changes were proposed in this pull request? Right now, we cancel pending allocate requests by its sending order. I thing we can take locality preference into account when do this to perfom least impact on task locality preference. ## How was this patch tested? N.A. Closes #23344 from Ngone51/dev-cancel-pending-allocate-requests-by-taking-locality-preference-into-account. Authored-by: Ngone51 <[email protected]> Signed-off-by: Marcelo Vanzin <[email protected]> (cherry picked from commit 3d6b44d) Signed-off-by: Marcelo Vanzin <[email protected]>
1 parent 74c1cd1 commit daeb081

File tree

1 file changed

+12
-17
lines changed

1 file changed

+12
-17
lines changed

resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala

Lines changed: 12 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -287,20 +287,20 @@ private[yarn] class YarnAllocator(
287287
s"pending: $numPendingAllocate, running: ${runningExecutors.size}, " +
288288
s"executorsStarting: ${numExecutorsStarting.get}")
289289

290+
// Split the pending container request into three groups: locality matched list, locality
291+
// unmatched list and non-locality list. Take the locality matched container request into
292+
// consideration of container placement, treat as allocated containers.
293+
// For locality unmatched and locality free container requests, cancel these container
294+
// requests, since required locality preference has been changed, recalculating using
295+
// container placement strategy.
296+
val (localRequests, staleRequests, anyHostRequests) = splitPendingAllocationsByLocality(
297+
hostToLocalTaskCounts, pendingAllocate)
298+
290299
if (missing > 0) {
291300
logInfo(s"Will request $missing executor container(s), each with " +
292301
s"${resource.getVirtualCores} core(s) and " +
293302
s"${resource.getMemory} MB memory (including $memoryOverhead MB of overhead)")
294303

295-
// Split the pending container request into three groups: locality matched list, locality
296-
// unmatched list and non-locality list. Take the locality matched container request into
297-
// consideration of container placement, treat as allocated containers.
298-
// For locality unmatched and locality free container requests, cancel these container
299-
// requests, since required locality preference has been changed, recalculating using
300-
// container placement strategy.
301-
val (localRequests, staleRequests, anyHostRequests) = splitPendingAllocationsByLocality(
302-
hostToLocalTaskCounts, pendingAllocate)
303-
304304
// cancel "stale" requests for locations that are no longer needed
305305
staleRequests.foreach { stale =>
306306
amClient.removeContainerRequest(stale)
@@ -360,14 +360,9 @@ private[yarn] class YarnAllocator(
360360
val numToCancel = math.min(numPendingAllocate, -missing)
361361
logInfo(s"Canceling requests for $numToCancel executor container(s) to have a new desired " +
362362
s"total $targetNumExecutors executors.")
363-
364-
val matchingRequests = amClient.getMatchingRequests(RM_REQUEST_PRIORITY, ANY_HOST, resource)
365-
if (!matchingRequests.isEmpty) {
366-
matchingRequests.iterator().next().asScala
367-
.take(numToCancel).foreach(amClient.removeContainerRequest)
368-
} else {
369-
logWarning("Expected to find pending requests, but found none.")
370-
}
363+
// cancel pending allocate requests by taking locality preference into account
364+
val cancelRequests = (staleRequests ++ anyHostRequests ++ localRequests).take(numToCancel)
365+
cancelRequests.foreach(amClient.removeContainerRequest)
371366
}
372367
}
373368

0 commit comments

Comments
 (0)