-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-16742] Mesos Kerberos Support #18519
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
Test build #79111 has finished for PR 18519 at commit
|
|
Test build #79112 has finished for PR 18519 at commit
|
|
Not a big deal but could we fix the PR title to be a bit more descriptive? |
|
Whoops, fixed. |
vanzin
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Did a quick first pass. Is there anything here that's unit testable?
| * a layer over the different cluster managers and deploy modes that Spark supports. | ||
| */ | ||
| object SparkSubmit extends CommandLineUtils { | ||
| object SparkSubmit extends CommandLineUtils with Logging { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can't do this. This breaks the logging configuration of spark-shell and other shells (which is WARN by default instead of INFO).
|
|
||
| import org.apache.hadoop.security.Credentials | ||
|
|
||
| class CredentialsSerializer { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
private[spark]? Also feels like this could be just a couple of methods in SparkHadoopUtil instead of a new separate class.
| } | ||
|
|
||
| def deserialize(tokenBytes: Array[Byte]): Credentials = { | ||
| val tokensBuf = new java.io.ByteArrayInputStream(tokenBytes) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not import it like other classes?
| logInfo(s"Adding ${creds.numberOfTokens()} tokens and ${creds.numberOfSecretKeys()} secret" + | ||
| s"keys to the current user's credentials.") | ||
|
|
||
| UserGroupInformation.getCurrentUser().addCredentials(creds) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks like SparkHadoopUtil.addCurrentUserCredentials. That's only implemented on YarnSparkHadoopUtil for historical reasons, but since we dropped Hadoop 1.x support, the implementation can move to core/ now, and you'd avoid this copy of that code.
| private[spark] | ||
| class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: RpcEnv) | ||
| class CoarseGrainedSchedulerBackend( | ||
| scheduler: TaskSchedulerImpl, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: one more indent level.
| "messages."))) | ||
| } | ||
|
|
||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: not needed
| * of requesting a delta of executors risks double counting new executors when there are | ||
| * insufficient resources to satisfy the first request. We make the assumption here that the | ||
| * cluster manager will eventually fulfill all requests when resources free up. | ||
| * |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: leave these as they were.
| sc.env.rpcEnv, | ||
| Some(new HadoopDelegationTokenManager( | ||
| sc.conf, | ||
| SparkHadoopUtil.get.newConfiguration(sc.conf)))) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sc.hadoopConfiguration?
|
Hello @vanzin, I'm taking over Michael's Spark duties at Mesosphere and will be addressing the comments on this PR. Should be able to have the revisions done in the next day or so. Thanks for the patience. |
|
Test build #79842 has finished for PR 18519 at commit
|
|
Test build #79847 has finished for PR 18519 at commit
|
|
Test build #79849 has finished for PR 18519 at commit
|
|
Test build #79914 has finished for PR 18519 at commit
|
|
@vanzin all green, ready for a check. |
|
@vanzin any thoughts on this and the related discussion? |
|
I've been pretty busy, probably can get to this sometime next week. |
| val shortUserName = UserGroupInformation.getCurrentUser.getShortUserName | ||
| val key = s"spark.hadoop.${YarnConfiguration.RM_PRINCIPAL}" | ||
| // scalastyle:off println | ||
| printStream.println(s"Setting ${key} to ${shortUserName}") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you want this to be printed out every time someone runs spark-submit? Sounds a bit noisy.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It only prints when UserGroupInformation.isSecurityEnabled and I think it's useful information whenever a job is run.
| scheduler: TaskSchedulerImpl, | ||
| val rpcEnv: RpcEnv, | ||
| hadoopDelegationTokenManager: Option[HadoopDelegationTokenManager]) | ||
| extends ExecutorAllocationClient with SchedulerBackend with Logging |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: unindent this line one level.
| class CoarseGrainedSchedulerBackend( | ||
| scheduler: TaskSchedulerImpl, | ||
| val rpcEnv: RpcEnv, | ||
| hadoopDelegationTokenManager: Option[HadoopDelegationTokenManager]) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm a little torn on having this as a constructor argument. It seems cleaner at first, but it kinda make the constructor of sub-classes (like the mesos one) kinda ugly.
How about having a protected val hadoopDelegationTokenManager = None and overriding it where needed? That makes initialization in the sub-class more readable.
|
@vanzin Fixed this up. Please have a look. Thanks. |
| class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: RpcEnv) | ||
| extends ExecutorAllocationClient with SchedulerBackend with Logging | ||
| { | ||
| class CoarseGrainedSchedulerBackend( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You're not changing anything here now, are you?
| <scope>test</scope> | ||
| </dependency> | ||
|
|
||
| <dependency> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this really needed?
I don't see you adding specific tests for this, so wonder why you need the explicit dependency when other modules that depend on spark-core don't.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, MesosClusterManagerSuite creates a MesosCoarseGrainedSchedulerBackend which contains a HadoopDelegationTokenManager... etc.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, ok... the credential manager code should be safe when Hive classes aren't present, but if there's a problem in that area it's not your fault.
| override val hadoopDelegationTokenManager: Option[HadoopDelegationTokenManager] = | ||
| Some(new HadoopDelegationTokenManager(sc.conf, sc.hadoopConfiguration)) | ||
|
|
||
| override val hadoopDelegationCreds: Option[Array[Byte]] = getHadoopDelegationCreds() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No need to override this guy.
| with org.apache.mesos.Scheduler | ||
| with MesosSchedulerUtils { | ||
|
|
||
| override val hadoopDelegationTokenManager: Option[HadoopDelegationTokenManager] = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
protected
|
Hello @vanzin thanks for the reviews, I believe I've addressed your comments. I also added support for the user to pass a ticket-granting ticket instead of a key tab. It's a very small change and I tested it against kerberized HDFS. Thanks. |
|
Test build #80551 has finished for PR 18519 at commit
|
It'd be better to avoid adding new features after the patch has been reviewed and is mostly ready for checking in. For example, you added a feature that is not necessary. |
|
Test build #80552 has finished for PR 18519 at commit
|
|
Test build #80565 has finished for PR 18519 at commit
|
|
Test build #80566 has finished for PR 18519 at commit
|
|
Test build #80573 has finished for PR 18519 at commit
|
|
Hello @vanzin, point taken. I reverted the change. The envvar trick works also, thanks for that. |
|
Test build #80574 has finished for PR 18519 at commit
|
| @volatile protected var currentExecutorIdCounter = 0 | ||
|
|
||
| // hadoop token manager used by some sub-classes (e.g. Mesos) | ||
| protected var hadoopDelegationTokenManager: Option[HadoopDelegationTokenManager] = None |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should be a val. Just override it in the subclass.
| protected var hadoopDelegationTokenManager: Option[HadoopDelegationTokenManager] = None | ||
|
|
||
| // Hadoop delegation tokens to be sent to the executors. | ||
| protected var hadoopDelegationCreds: Option[Array[Byte]] = None |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should be a val; you don't need to set in in the subclass. Because there might be some initialization order issue, it might need to be a lazy val.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So, another option here is have this be a val and have hadoopDelegationTokenManager be a def. The latter means there's no initialization order issue (so no need for the lazy val hack). Since this is really only done once, that should be fine.
|
LGTM pending tests. I'm not yet super happy with the internal API in |
|
(I did some basic local testing on secure YARN, just in case, and it looks good.) |
|
Test build #80802 has finished for PR 18519 at commit
|
|
Merging to master. |
|
@ArtRand Any plans to add delegation token renewal under Mesos in the future? |
Add Kerberos Support to Mesos. This includes kinit and --keytab support, but does not include delegation token renewal. Manually against a Secure DC/OS Apache HDFS cluster. Author: ArtRand <[email protected]> Author: Michael Gummelt <[email protected]> Closes apache#18519 from mgummelt/SPARK-16742-kerberos.
What changes were proposed in this pull request?
Add Kerberos Support to Mesos. This includes kinit and --keytab support, but does not include delegation token renewal.
How was this patch tested?
Manually against a Secure DC/OS Apache HDFS cluster.