Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
addressed comments
  • Loading branch information
ArtRand committed Aug 10, 2017
commit 641bdadabdb070e0308fb00625cff2ab5efd064f
Original file line number Diff line number Diff line change
Expand Up @@ -46,11 +46,10 @@ import org.apache.spark.util.{RpcUtils, SerializableBuffer, ThreadUtils, Utils}
*/
private[spark]
class CoarseGrainedSchedulerBackend(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're not changing anything here now, are you?

scheduler: TaskSchedulerImpl,
val rpcEnv: RpcEnv,
hadoopDelegationTokenManager: Option[HadoopDelegationTokenManager])
extends ExecutorAllocationClient with SchedulerBackend with Logging
{
scheduler: TaskSchedulerImpl,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: one more indent level.

val rpcEnv: RpcEnv)
extends ExecutorAllocationClient with SchedulerBackend with Logging {

// Use an atomic variable to track total number of cores in the cluster for simplicity and speed
protected val totalCoreCount = new AtomicInteger(0)
// Total number of executors that are currently registered
Expand Down Expand Up @@ -102,6 +101,9 @@ class CoarseGrainedSchedulerBackend(
// The num of current max ExecutorId used to re-register appMaster
@volatile protected var currentExecutorIdCounter = 0

// hadoop token manager used by some sub-classes (e.g. Mesos)
protected val hadoopDelegationTokenManager: Option[HadoopDelegationTokenManager] = None

// Hadoop delegation tokens to be sent to the executors.
private val hadoopDelegationCreds = getHadoopDelegationCreds()

Expand Down Expand Up @@ -689,7 +691,7 @@ class CoarseGrainedSchedulerBackend(
}

private def getHadoopDelegationCreds(): Option[Array[Byte]] = {
if (UserGroupInformation.isSecurityEnabled) {
if (UserGroupInformation.isSecurityEnabled && hadoopDelegationTokenManager.isDefined) {
hadoopDelegationTokenManager.map { manager =>
val creds = UserGroupInformation.getCurrentUser.getCredentials
val hadoopConf = SparkHadoopUtil.get.newConfiguration(conf)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ private[spark] class StandaloneSchedulerBackend(
scheduler: TaskSchedulerImpl,
sc: SparkContext,
masters: Array[String])
extends CoarseGrainedSchedulerBackend(scheduler, sc.env.rpcEnv, None)
extends CoarseGrainedSchedulerBackend(scheduler, sc.env.rpcEnv)
with StandaloneAppClientListener
with Logging {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,13 +56,13 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
securityManager: SecurityManager)
extends CoarseGrainedSchedulerBackend(
scheduler,
sc.env.rpcEnv,
Some(new HadoopDelegationTokenManager(
sc.conf,
sc.hadoopConfiguration)))
sc.env.rpcEnv)
with org.apache.mesos.Scheduler
with MesosSchedulerUtils {

override val hadoopDelegationTokenManager: Option[HadoopDelegationTokenManager] =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

protected

Some(new HadoopDelegationTokenManager(sc.conf, sc.hadoopConfiguration))

// Blacklist a slave after this many failures
private val MAX_SLAVE_FAILURES = 2

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ import org.apache.spark.util.{RpcUtils, ThreadUtils}
private[spark] abstract class YarnSchedulerBackend(
scheduler: TaskSchedulerImpl,
sc: SparkContext)
extends CoarseGrainedSchedulerBackend(scheduler, sc.env.rpcEnv, None) {
extends CoarseGrainedSchedulerBackend(scheduler, sc.env.rpcEnv) {

private val stopped = new AtomicBoolean(false)

Expand Down