Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
Add feature to permanently blacklist a user-specified list of nodes, S…
  • Loading branch information
LucaCanali committed Aug 24, 2017
commit 4932d3767e3c3bb24a94a7422b7b8e6f7a5bf08b
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,22 @@ private[scheduler] class BlacklistTracker (
*/
val nodeToBlacklistedExecs = new HashMap[String, HashSet[String]]()

/**
* Blacklists permanently the nodes listed in spark.blacklist.alwaysBlacklistedNodes
* The blacklist timeout is set to a large value, effectively never expiring.
*/
private val permanentlyBlacklistedNodes: Seq[String] =
conf.get("spark.blacklist.alwaysBlacklistedNodes", "").split(',').map(_.trim).filter(_ != "")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The configuartion should goes to internal/config

if (permanentlyBlacklistedNodes.nonEmpty) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should make this a private function and init the val _nodeBlacklist with this.

val now = clock.getTimeMillis()
for (nodeName <- permanentlyBlacklistedNodes) {
nodeIdToBlacklistExpiryTime.put(nodeName, Long.MaxValue)
listenerBus.post(SparkListenerNodeBlacklisted(now, nodeName, 0))
logWarning(s"Permanently blacklisted node $nodeName")
}
_nodeBlacklist.set(nodeIdToBlacklistExpiryTime.keySet.toSet)
}

/**
* Un-blacklists executors and nodes that have been blacklisted for at least
* BLACKLIST_TIMEOUT_MILLIS
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -585,4 +585,16 @@ class BlacklistTrackerSuite extends SparkFunSuite with BeforeAndAfterEach with M
2000 + blacklist.BLACKLIST_TIMEOUT_MILLIS)
assert(blacklist.nextExpiryTime === 1000 + blacklist.BLACKLIST_TIMEOUT_MILLIS)
}

test("Nodes can be permanently blacklisted, SPARK-21829") {
val blacklistedNodes = "hostA, hostB"
conf.set("spark.blacklist.alwaysBlacklistedNodes", blacklistedNodes)

val allocationClientMock = mock[ExecutorAllocationClient]
blacklist = new BlacklistTracker(listenerBusMock, conf, Some(allocationClientMock), clock)
for (nodeName <- blacklistedNodes.split(',').map(_.trim)) {
assert(blacklist.nodeIdToBlacklistExpiryTime.contains(nodeName))
(blacklist.nodeIdToBlacklistExpiryTime.get(nodeName) === Long.MaxValue)
}
}
}
11 changes: 11 additions & 0 deletions docs/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -1528,6 +1528,17 @@ Apart from these, the following properties are also available, and may be useful
blacklisted.
</td>
</tr>
<tr>
<td><code>spark.blacklist.alwaysBlacklistedNodes</code></td>
<td>(none)</td>
<td>
A comma-separated list of cluster nodes that will be put in the scheduler blacklist at the start of the Spark Context.
These nodes are permanently blacklisted and are exempt from the spark.blacklist.timeout mechanism.
If the cluster manager allocates executors on nodes in the blacklist, they will be rejected by the scheduler.
This feature can be used to prevent running executors/tasks on a user-specified list of cluster nodes.
Dependency: requires spark.blacklist.enabled=true
</td>
</tr>
<tr>
<td><code>spark.speculation</code></td>
<td>false</td>
Expand Down