Skip to content
Closed
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
change the task number to count; change the WithLoads to WithTaskCount
  • Loading branch information
GraceH committed Nov 10, 2015
commit dc660f63c416c300bd3da48c6a0b9442633313fc
Original file line number Diff line number Diff line change
Expand Up @@ -88,8 +88,8 @@ private[spark] class TaskSchedulerImpl(
val nextTaskId = new AtomicLong(0)

// Which executor IDs we have executors on
// each executor will record running or launched task number
val activeExecutorIdsWithLoads = new HashMap[String, Int]
// each executor will record running or launched task count
val activeExecutorIdsWithTaskCount = new HashMap[String, Int]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not your code, but please keep this private

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also, I would rename it and update the comment

// Number of tasks running on each executor
private val executorIdToTaskCount = ...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Make sense.


// The set of executors we have on each host; this is used to compute hostsAlive, which
// in turn is used to decide when we can attain data locality on a given host
Expand Down Expand Up @@ -255,7 +255,7 @@ private[spark] class TaskSchedulerImpl(
val tid = task.taskId
taskIdToTaskSetManager(tid) = taskSet
taskIdToExecutorId(tid) = execId
activeExecutorIdsWithLoads(execId) += 1
activeExecutorIdsWithTaskCount(execId) += 1
executorsByHost(host) += execId
availableCpus(i) -= CPUS_PER_TASK
assert(availableCpus(i) >= 0)
Expand Down Expand Up @@ -284,7 +284,7 @@ private[spark] class TaskSchedulerImpl(
var newExecAvail = false
for (o <- offers) {
executorIdToHost(o.executorId) = o.host
activeExecutorIdsWithLoads.getOrElseUpdate(o.executorId, 0)
activeExecutorIdsWithTaskCount.getOrElseUpdate(o.executorId, 0)
if (!executorsByHost.contains(o.host)) {
executorsByHost(o.host) = new HashSet[String]()
executorAdded(o.executorId, o.host)
Expand Down Expand Up @@ -334,7 +334,7 @@ private[spark] class TaskSchedulerImpl(
// We lost this entire executor, so remember that it's gone
val execId = taskIdToExecutorId(tid)

if (activeExecutorIdsWithLoads.contains(execId)) {
if (activeExecutorIdsWithTaskCount.contains(execId)) {
removeExecutor(execId,
SlaveLost(s"Task $tid was lost, so marking the executor as lost as well."))
failedExecutor = Some(execId)
Expand All @@ -345,7 +345,7 @@ private[spark] class TaskSchedulerImpl(
if (TaskState.isFinished(state)) {
taskIdToTaskSetManager.remove(tid)
taskIdToExecutorId.remove(tid) match {
case Some(execId) => activeExecutorIdsWithLoads(execId) -= 1
case Some(execId) => activeExecutorIdsWithTaskCount(execId) -= 1
case None =>
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

taskIdToExecutorId.remove(tid).foreach { eid =>
  if (executorIdToTaskCount.contains(execId)) {
    executorIdToTaskCount(execId) -= 1
  }
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will change that.

}
Expand Down Expand Up @@ -468,7 +468,7 @@ private[spark] class TaskSchedulerImpl(
var failedExecutor: Option[String] = None

synchronized {
if (activeExecutorIdsWithLoads.contains(executorId)) {
if (activeExecutorIdsWithTaskCount.contains(executorId)) {
val hostPort = executorIdToHost(executorId)
logError("Lost executor %s on %s: %s".format(executorId, hostPort, reason))
removeExecutor(executorId, reason)
Expand All @@ -490,7 +490,7 @@ private[spark] class TaskSchedulerImpl(

/** Remove an executor from all our data structures and mark it as lost */
private def removeExecutor(executorId: String, reason: ExecutorLossReason) {
activeExecutorIdsWithLoads -= executorId
activeExecutorIdsWithTaskCount -= executorId

val host = executorIdToHost(executorId)
val execs = executorsByHost.getOrElse(host, new HashSet)
Expand Down Expand Up @@ -525,11 +525,11 @@ private[spark] class TaskSchedulerImpl(
}

def isExecutorAlive(execId: String): Boolean = synchronized {
activeExecutorIdsWithLoads.contains(execId)
activeExecutorIdsWithTaskCount.contains(execId)
}

def isExecutorBusy(execId: String): Boolean = synchronized {
activeExecutorIdsWithLoads.getOrElse(execId, -1) > 0
activeExecutorIdsWithTaskCount.getOrElse(execId, -1) > 0
}

// By default, rack is unknown
Expand Down