Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
SPARK-19755 declining offers from blacklisted slave by BlacklistTracker
  • Loading branch information
IgorBerman committed Feb 21, 2018
commit e2ddc1be19e2f978df4fe84073aff3f5b46afe45
Original file line number Diff line number Diff line change
Expand Up @@ -568,6 +568,7 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
cpus + totalCoresAcquired <= maxCores &&
mem <= offerMem &&
numExecutors < executorLimit &&
!scheduler.nodeBlacklist().contains(offerHostname) &&
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just want to make really sure everybody understands the big change in behavior here -- nodeBlacklist() currently only gets updated based on failures in spark tasks. If a mesos task fails to even start -- that is, if a spark executor fails to launch on a node -- nodeBlacklist does not get updated. So you could have a node that is misconfigured somehow, and you might end up repeatedly trying to launch executors on it after this changed, with the executor even failing to start each time. That is even if you have blacklisting on.

This is SPARK-16630 for the non-mesos case. That is being actively worked on now -- however the work there will probably have to be yarn-specific, so there will still be followup work to get the same thing for mesos after that is in.

Copy link
Contributor

@skonto skonto May 19, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@squito sounds reasonable. In the mean time we have to deal with a limitation at the mesos side where the value is hardcoded. So we can move with this incrementally.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe comment on this in the code here and add a JIRA for tracking?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This checking looks a little late. Can we decline more faster without calculating everything?

meetsPortRequirements &&
satisfiesLocality(offerHostname)
}
Expand Down Expand Up @@ -792,4 +793,4 @@ object IdHelper {
// Use atomic values since Spark contexts can be initialized in parallel
private[mesos] val nextSCNumber = new AtomicLong(0)
private[mesos] val startedBefore = new AtomicBoolean(false)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,28 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite
verifyTaskLaunched(driver, "o2")
}

test("mesos declines offers from blacklisted slave") {
setBackend()

// launches a task on a valid offer on slave s1
val minMem = backend.executorMemory(sc) + 1024
val minCpu = 4
val offer1 = Resources(minMem, minCpu)
offerResources(List(offer1))
verifyTaskLaunched(driver, "o1")

// for any reason executor(aka mesos task) failed on s1
val status = createTaskStatus("0", "s1", TaskState.TASK_FAILED)
backend.statusUpdate(driver, status)
when(taskScheduler.nodeBlacklist()).thenReturn(Set("hosts1"))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just to re-iterate my point above -- in many cases, having an executor fail will not lead to taskScheduler.nodeBlacklist() changing as you're doing here.


val offer2 = Resources(minMem, minCpu)
// Offer resources from the same slave
offerResources(List(offer2))
// but since it's blacklisted the offer is declined
verifyDeclinedOffer(driver, createOfferId("o1"))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will this actually pass? I thought it wouldn't b/c the filtering is done inside buildMesosTasks, which never calls declineOffer on offers that fail canLaunchTask. (a separate thing which needs fixing -- you could open another issue for that.)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah nevermind. I took another look at the code and now I see how this works

}

test("mesos supports spark.executor.cores") {
val executorCores = 4
setBackend(Map("spark.executor.cores" -> executorCores.toString))
Expand Down Expand Up @@ -790,6 +812,7 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite

taskScheduler = mock[TaskSchedulerImpl]
when(taskScheduler.sc).thenReturn(sc)
when(taskScheduler.nodeBlacklist()).thenReturn(Set[String]())

externalShuffleClient = mock[MesosExternalShuffleClient]

Expand Down