Skip to content

Commit 3d6b44d

Browse files
Ngone51Marcelo Vanzin
authored andcommitted
[SPARK-26392][YARN] Cancel pending allocate requests by taking locality preference into account
## What changes were proposed in this pull request? Right now, we cancel pending allocate requests by its sending order. I thing we can take locality preference into account when do this to perfom least impact on task locality preference. ## How was this patch tested? N.A. Closes #23344 from Ngone51/dev-cancel-pending-allocate-requests-by-taking-locality-preference-into-account. Authored-by: Ngone51 <[email protected]> Signed-off-by: Marcelo Vanzin <[email protected]>
1 parent 6692bac commit 3d6b44d

File tree

1 file changed

+12
-17
lines changed

1 file changed

+12
-17
lines changed

resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala

Lines changed: 12 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -294,6 +294,15 @@ private[yarn] class YarnAllocator(
294294
s"pending: $numPendingAllocate, running: ${runningExecutors.size}, " +
295295
s"executorsStarting: ${numExecutorsStarting.get}")
296296

297+
// Split the pending container request into three groups: locality matched list, locality
298+
// unmatched list and non-locality list. Take the locality matched container request into
299+
// consideration of container placement, treat as allocated containers.
300+
// For locality unmatched and locality free container requests, cancel these container
301+
// requests, since required locality preference has been changed, recalculating using
302+
// container placement strategy.
303+
val (localRequests, staleRequests, anyHostRequests) = splitPendingAllocationsByLocality(
304+
hostToLocalTaskCounts, pendingAllocate)
305+
297306
if (missing > 0) {
298307
if (log.isInfoEnabled()) {
299308
var requestContainerMessage = s"Will request $missing executor container(s), each with " +
@@ -306,15 +315,6 @@ private[yarn] class YarnAllocator(
306315
logInfo(requestContainerMessage)
307316
}
308317

309-
// Split the pending container request into three groups: locality matched list, locality
310-
// unmatched list and non-locality list. Take the locality matched container request into
311-
// consideration of container placement, treat as allocated containers.
312-
// For locality unmatched and locality free container requests, cancel these container
313-
// requests, since required locality preference has been changed, recalculating using
314-
// container placement strategy.
315-
val (localRequests, staleRequests, anyHostRequests) = splitPendingAllocationsByLocality(
316-
hostToLocalTaskCounts, pendingAllocate)
317-
318318
// cancel "stale" requests for locations that are no longer needed
319319
staleRequests.foreach { stale =>
320320
amClient.removeContainerRequest(stale)
@@ -374,14 +374,9 @@ private[yarn] class YarnAllocator(
374374
val numToCancel = math.min(numPendingAllocate, -missing)
375375
logInfo(s"Canceling requests for $numToCancel executor container(s) to have a new desired " +
376376
s"total $targetNumExecutors executors.")
377-
378-
val matchingRequests = amClient.getMatchingRequests(RM_REQUEST_PRIORITY, ANY_HOST, resource)
379-
if (!matchingRequests.isEmpty) {
380-
matchingRequests.iterator().next().asScala
381-
.take(numToCancel).foreach(amClient.removeContainerRequest)
382-
} else {
383-
logWarning("Expected to find pending requests, but found none.")
384-
}
377+
// cancel pending allocate requests by taking locality preference into account
378+
val cancelRequests = (staleRequests ++ anyHostRequests ++ localRequests).take(numToCancel)
379+
cancelRequests.foreach(amClient.removeContainerRequest)
385380
}
386381
}
387382

0 commit comments

Comments
 (0)