Skip to content
This repository was archived by the owner on Dec 4, 2024. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from 17 commits
Commits
Show all changes
43 commits
Select commit Hold shift + click to select a range
5150951
WIP add tests and update manifest with spark.tgz
samvantran Jul 26, 2018
764e6c0
WIP fix cleanUpSubmitArgs to handle special chars and multiargs
samvantran Jul 26, 2018
746969a
Cleanup
samvantran Jul 26, 2018
62b542b
Add vendor pkg go-shellwords
samvantran Jul 26, 2018
a71e59f
Fix url
samvantran Jul 26, 2018
9f1dee7
More cleanup + gofmt
samvantran Jul 27, 2018
65a0f7c
Fix single quote error
samvantran Jul 27, 2018
8d347b9
Fix descrip
samvantran Jul 27, 2018
8336832
More fixes and tests
samvantran Jul 27, 2018
db177cd
Debug why single quote fails via spark run
samvantran Jul 29, 2018
c5df110
Fixes and cleanup
samvantran Jul 29, 2018
55412bc
gofmt
samvantran Jul 29, 2018
5a8e3f4
Comment out test, need to create app to print out options
samvantran Jul 30, 2018
8297693
Add simple app + test for CI
samvantran Jul 31, 2018
93f588a
Cleanup and fix test
samvantran Aug 1, 2018
96ac0a2
Fixes
samvantran Aug 1, 2018
3026e88
Cleanup test cases
samvantran Aug 2, 2018
7c294c7
Address PR comments
samvantran Aug 2, 2018
7b12b34
Fix expected test output
samvantran Aug 2, 2018
ff65589
Write confs to tempfile
samvantran Aug 7, 2018
29701c3
Forgot arg in parent function
samvantran Aug 7, 2018
376bf1b
Let's try escaping the quotes
samvantran Aug 8, 2018
f7ad435
Alternatively, wrap entire fn in file
samvantran Aug 8, 2018
6b32e56
Add function isSparkApp
samvantran Aug 14, 2018
c711c6c
Print out all system.properties in app
samvantran Aug 14, 2018
34dffbc
Run the actual file in test
samvantran Aug 14, 2018
d0a230e
Add run perms to tempfile
samvantran Aug 14, 2018
1afd177
Octals are different in python3
samvantran Aug 15, 2018
644c264
Subprocess.run needs shell=True
samvantran Aug 15, 2018
f267518
Sleep right after chmod (potentially old Docker bug)
samvantran Aug 15, 2018
e7035d0
Holy bejesus it finally works
samvantran Aug 16, 2018
9f86664
Cleanup, move logic to test_spark and revert spark_utils
samvantran Aug 16, 2018
0fef2c3
Simplify test_multi_arg_confs
samvantran Aug 17, 2018
39434da
Address PR comments
samvantran Aug 20, 2018
cfadbf1
Cleanup
samvantran Aug 21, 2018
9a69880
Oops, too hasty with the revert
samvantran Aug 21, 2018
36faae7
Merge branch 'master' into DCOS-38138-shell-escape
samvantran Aug 24, 2018
fbc86dc
Use spark distro 2.6.5 created from default
samvantran Aug 24, 2018
13bb7f0
Resync test.sh from dcos-commons: use DOCKER_IMAGE envvar
Aug 25, 2018
7865b6c
Skip test_jar test
samvantran Aug 28, 2018
0ca555a
Merge branch 'master' into DCOS-38138-shell-escape
samvantran Aug 28, 2018
d0aae3c
Remove checking for bool values
samvantran Aug 29, 2018
7b2bb6c
Move app extensions closer to method
samvantran Aug 30, 2018
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 7 additions & 7 deletions cli/dcos-spark/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,22 +209,22 @@ func (cmd *SparkCommand) runQuotaCreate(a *kingpin.Application, e *kingpin.Parse
requestValues := make([]quotaCreateGuarantee, 0)
if cmd.quotaCpus != 0 {
requestValues = append(requestValues, quotaCreateGuarantee{
Name: "cpus",
Type: "SCALAR",
Name: "cpus",
Type: "SCALAR",
Scalar: quotaScalar{Value: cmd.quotaCpus},
})
}
if cmd.quotaMem != 0 {
requestValues = append(requestValues, quotaCreateGuarantee{
Name: "mem",
Type: "SCALAR",
Name: "mem",
Type: "SCALAR",
Scalar: quotaScalar{Value: cmd.quotaMem},
})
}
if cmd.quotaGpus != 0 {
requestValues = append(requestValues, quotaCreateGuarantee{
Name: "gpus",
Type: "SCALAR",
Name: "gpus",
Type: "SCALAR",
Scalar: quotaScalar{Value: float64(cmd.quotaGpus)},
})
}
Expand Down Expand Up @@ -311,7 +311,7 @@ func handleCommands(app *kingpin.Application) {

run := app.Command("run", "Submit a job to the Spark Mesos Dispatcher").Action(cmd.runSubmit)
run.Flag("submit-args", fmt.Sprintf("Arguments matching what would be sent to 'spark-submit': %s",
sparkSubmitHelp())).Required().PlaceHolder("ARGS").StringVar(&cmd.submitArgs)
sparkSubmitHelp())).Required().PlaceHolder("\"ARGS\"").StringVar(&cmd.submitArgs)
// TODO this should be moved to submit args
run.Flag("docker-image", "Docker image to run the job within").
Default("").
Expand Down
20 changes: 9 additions & 11 deletions cli/dcos-spark/security.go
Original file line number Diff line number Diff line change
@@ -1,31 +1,30 @@
package main

import (
"fmt"
"errors"
"crypto/rand"
"encoding/base64"
"errors"
"fmt"
"github.com/mesosphere/dcos-commons/cli/client"
"strings"
)


const KEYLENGTH = 128

var TASK_TYPES = []string{"driver", "executor"}

var SECRET_REFERENCE_PROPERTIES = map[string]string{
"driver": "spark.mesos.driver.secret.names",
"driver": "spark.mesos.driver.secret.names",
"executor": "spark.mesos.executor.secret.names",
}

var SECRET_FILENAME_PROPERTIES = map[string]string{
"driver": "spark.mesos.driver.secret.filenames",
"driver": "spark.mesos.driver.secret.filenames",
"executor": "spark.mesos.executor.secret.filenames",
}

var SECRET_ENVKEY_PROPERTIES = map[string]string{
"driver": "spark.mesos.driver.secret.envkeys",
"driver": "spark.mesos.driver.secret.envkeys",
"executor": "spark.mesos.executor.secret.envkeys",
}

Expand All @@ -34,7 +33,6 @@ const SPARK_KDC_PORT_KEY = "SPARK_SECURITY_KERBEROS_KDC_PORT"
const SPARK_KERBEROS_REALM_KEY = "SPARK_SECURITY_KERBEROS_REALM"
const SPARK_KERBEROS_KRB5_BLOB = "SPARK_MESOS_KRB5_CONF_BASE64"


// utility function used by SASL and Kerberos for user-defined secrets that may be base64 encoded blobs
// basically removes the prefix while ignoring the secret directory structure
func prepareBase64Secret(secretPath string) string {
Expand All @@ -44,7 +42,7 @@ func prepareBase64Secret(secretPath string) string {
}

absoluteSecretPath := strings.Split(secretPath, "/")
filename := absoluteSecretPath[len(absoluteSecretPath) - 1]
filename := absoluteSecretPath[len(absoluteSecretPath)-1]
// secrets with __dcos_base64__ will be decoded by Mesos, but remove the prefix here
if strings.HasPrefix(filename, "__dcos_base64__") {
return strings.TrimPrefix(filename, "__dcos_base64__")
Expand Down Expand Up @@ -92,7 +90,7 @@ func setupTLSArgs(args *sparkArgs) {
filenames := []string{keyStoreFileName}
envkeys := []string{"DCOS_SPARK_KEYSTORE"}

if args.truststoreSecretPath != "" { // optionally add the truststore configs also
if args.truststoreSecretPath != "" { // optionally add the truststore configs also
addPropertyAndWarn(args, "spark.ssl.trustStore", trustStoreFileName)
setPropertyToDefaultIfNotSet(args, "spark.ssl.trustStorePassword", args.truststorePassword)
paths = append(paths, args.truststoreSecretPath)
Expand Down Expand Up @@ -258,8 +256,8 @@ func forwardEnvironmentVariablesFromMarathonConfig(args *sparkArgs, marathonJson

if kdcPropCount > 0 && kdcPropCount != 3 {
client.PrintMessage(
"WARNING: Missing some of the 3 dispatcher environment variables (%s, %s, %s) " +
"required for templating krb5.conf",
"WARNING: Missing some of the 3 dispatcher environment variables (%s, %s, %s) "+
"required for templating krb5.conf",
SPARK_KDC_HOSTNAME_KEY, SPARK_KDC_PORT_KEY, SPARK_KERBEROS_REALM_KEY)
}

Expand Down
124 changes: 57 additions & 67 deletions cli/dcos-spark/submit_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,16 @@ import (
"encoding/json"
"errors"
"fmt"
"github.com/mesosphere/dcos-commons/cli/client"
"github.com/mesosphere/dcos-commons/cli/config"
"gopkg.in/alecthomas/kingpin.v3-unstable"
"log"
"net/url"
"os"
"regexp"
"strings"

"github.com/mattn/go-shellwords"
"github.com/mesosphere/dcos-commons/cli/client"
"github.com/mesosphere/dcos-commons/cli/config"
"gopkg.in/alecthomas/kingpin.v3-unstable"
)

var keyWhitespaceValPattern = regexp.MustCompile("(.+)\\s+(.+)")
Expand Down Expand Up @@ -146,8 +148,10 @@ Args:
StringVar(&args.mainClass) // note: spark-submit can autodetect, but only for file://local.jar
submit.Flag("properties-file", "Path to file containing whitespace-separated Spark property defaults.").
PlaceHolder("PATH").ExistingFileVar(&args.propertiesFile)
submit.Flag("conf", "Custom Spark configuration properties.").
PlaceHolder("PROP=VALUE").StringMapVar(&args.properties)
submit.Flag("conf", "Custom Spark configuration properties. "+
"If submitting properties with multiple values, "+
"wrap in single quotes e.g. --conf prop='val1 val2'").
PlaceHolder("prop=value").StringMapVar(&args.properties)
submit.Flag("kerberos-principal", "Principal to be used to login to KDC.").
PlaceHolder("user@REALM").Default("").StringVar(&args.kerberosPrincipal)
submit.Flag("keytab-secret-path", "Path to Keytab in secret store to be used in the Spark drivers").
Expand Down Expand Up @@ -280,75 +284,61 @@ func parseApplicationFile(args *sparkArgs) error {
return nil
}

// we use Kingpin to parse CLI commands and options
// spark-submit by convention uses '--arg val' while kingpin only supports --arg=val
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we create a ticket to address this. Kingpin supports both --arg val and --arg=val.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great! Thanks.

// cleanUpSubmitArgs transforms the former to the latter
func cleanUpSubmitArgs(argsStr string, boolVals []*sparkVal) ([]string, []string) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe this function is misnamed - "cleanup" sounds like simply removing extra spaces or special characters.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm, how about renaming to transformSubmitArgs? ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not convinced that this is even needed. I just tested the quota command -- also defined using kingpin and the output is as follows:

➜  dcos-spark git:(DCOS-38138-shell-escape) ✗ dcos --log-level debug spark quota create --cpus 10 foo
Using CLI config file: /Users/elezar/.dcos/clusters/c173c4f2-c398-43c6-ab72-d46b99cb326a/dcos.toml
HTTP Query (96 byte payload): POST https://mvdrf110s.scaletesting.mesosphe.re/mesos/quota
{"role":"foo","guarantee":[{"name":"cpus","type":"SCALAR","scalar":{"value":10}}],"force":false}
^CUser interrupted command with Ctrl-C
➜  dcos-spark git:(DCOS-38138-shell-escape) ✗ dcos --log-level debug spark quota create --cpus=10 foo
Using CLI config file: /Users/elezar/.dcos/clusters/c173c4f2-c398-43c6-ab72-d46b99cb326a/dcos.toml
HTTP Query (96 byte payload): POST https://mvdrf110s.scaletesting.mesosphe.re/mesos/quota
{"role":"foo","guarantee":[{"name":"cpus","type":"SCALAR","scalar":{"value":10}}],"force":false}
^CUser interrupted command with Ctrl-C

Note that kingping handles both the --arg=value and --arg value forms the same way. We could just drop this function entirely.

Copy link
Contributor Author

@samvantran samvantran Aug 14, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If this is true, it will be a major
getinsertpic.com

*edit
thinking it over, maybe its original intent was to simply separate the spark-submit args from the application args but it may have morphed since then.

It might be worth testing deleting the cleanSubmitArgs function and seeing if all the tests still pass (or maybe adding a --app-args flags since that's needed for the application itself... but that would be a breaking API change). I also have a feeling that it does not handle long strings of args.


// collapse two or more spaces to one.
argsCompacted := collapseSpacesPattern.ReplaceAllString(argsStr, " ")
// clean up any instances of shell-style escaped newlines: "arg1\\narg2" => "arg1 arg2"
argsCleaned := strings.TrimSpace(backslashNewlinePattern.ReplaceAllLiteralString(argsCompacted, " "))
// HACK: spark-submit uses '--arg val' by convention, while kingpin only supports '--arg=val'.
// translate the former into the latter for kingpin to parse.
args := strings.Split(argsCleaned, " ")
argsEquals := make([]string, 0)
appFlags := make([]string, 0)
i := 0
ARGLOOP:
for i < len(args) {
arg := args[i]
if !strings.HasPrefix(arg, "-") {
// looks like we've exited the flags entirely, and are now at the jar and/or args.
// any arguments without a dash at the front should've been joined to preceding keys.
// flush the rest and exit.
for i < len(args) {
arg = args[i]
// if we have a --flag going to the application we need to take the arg (flag) and the value ONLY
// if it's not of the format --flag=val which scopt allows
if strings.HasPrefix(arg, "-") {
appFlags = append(appFlags, arg)
if strings.Contains(arg, "=") || (i+1) >= len(args) {
i += 1
} else {
// if there's a value with this flag, add it
if !strings.HasPrefix(args[i+1], "-") {
appFlags = append(appFlags, args[i+1])
i += 1
}
i += 1
}
} else {
argsEquals = append(argsEquals, arg)
i += 1
}
}
break
}
// join this arg to the next arg if...:
// 1. we're not at the last arg in the array
// 2. we start with "--"
// 3. we don't already contain "=" (already joined)
// 4. we aren't a boolean value (no val to join)
if i < len(args)-1 && strings.HasPrefix(arg, "--") && !strings.Contains(arg, "=") {
// check for boolean:
argsStr = strings.TrimSpace(backslashNewlinePattern.ReplaceAllLiteralString(argsStr, " "))
// collapse two or more spaces to one
argsStr = collapseSpacesPattern.ReplaceAllString(argsStr, " ")
// parse argsStr into []string args maintaining shell escaped sequences
args, err := shellwords.Parse(argsStr)
if err != nil {
log.Fatalf("Could not parse string args correctly. Error: %v", err)
}
sparkArgs, appArgs := make([]string, 0), make([]string, 0)
LOOP:
for i := 0; i < len(args); {
current := strings.TrimSpace(args[i])
switch {
// if current is a spark jar/app, we've processed all flags; add jar to sparkArgs and append the rest to appArgs
case strings.HasSuffix(current, ".jar") || strings.HasSuffix(current, ".r") || strings.HasSuffix(current, ".py"):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't know if it matters but previously the check for an R app is capital R: https://github.com/mesosphere/spark-build/blob/3026e88dae1df008705905ac38cfde164b660220/cli/dcos-spark/submit_builder.go#L251, here it's lower case.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch! It probably does matter so I'll change it back

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ping.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In fact. How about having an isAppArtifact or function or something? (with unit tests to ensure that we don't subtly change the behaviour)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done 6b32e5

sparkArgs = append(sparkArgs, args[i])
appArgs = append(appArgs, args[i+1:]...)
break LOOP
case strings.HasPrefix(current, "--"):
// if current is a boolean flag, no merge required; eg --supervise
for _, boolVal := range boolVals {
if boolVal.flagName == arg[2:] {
argsEquals = append(argsEquals, arg)
i += 1
continue ARGLOOP
if boolVal.flagName == current[2:] {
sparkArgs = append(sparkArgs, current)
i++
continue LOOP
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if we pass --supervise false or something similar? The code doesn't seem to handle this case explicitly (It seems as if this will be joined in the default case though.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're right, false will get joined with --supervise and the job will fail. I just tried it and we get a parse error

Error when parsing --submit-args: unknown long flag '--supervise false'

In this case, it might make sense to check for boolean flags and if found, check the next argument. If it's a boolean statement like true/false, we could just move the pointer forward and skip it (i+=2).

More generally though, there are a lot of ways to mess up confs. Does it make sense to have some kind of blacklist that guards against 'user error'?

}
}
// merge this --key against the following val to get --key=val
argsEquals = append(argsEquals, arg+"="+args[i+1])
if strings.Contains(current, "=") {
// already in the form arg=val, no merge required
sparkArgs = append(sparkArgs, current)
i++
continue LOOP
}
// otherwise, merge current with next into form arg=val; eg --driver-memory=512m
next := args[i+1]
sparkArgs = append(sparkArgs, current+"="+next)
i += 2
} else {
// already joined or at the end, pass through:
argsEquals = append(argsEquals, arg)
i += 1
default:
// if not a flag or jar, current is a continuation of the last arg and should not have been split
// eg extraJavaOptions="-Dparam1 -Dparam2" was parsed as [extraJavaOptions, -Dparam1, -Dparam2]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, this isn't handled by shellwords? (It sounded like it might based on the README.)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No unfortunately the library splits the string on non-escaped spaces so this default case glues the string back together.

combined := sparkArgs[len(sparkArgs)-1] + " " + current
sparkArgs = append(sparkArgs[:len(sparkArgs)-1], combined)
i++
}
}
client.PrintVerbose("Translated spark-submit arguments: '%s'", argsEquals)
client.PrintVerbose("Translated application arguments: '%s'", appFlags)

return argsEquals, appFlags
if config.Verbose {
client.PrintVerbose("Translated spark-submit arguments: '%s'", strings.Join(sparkArgs, ", "))
client.PrintVerbose("Translated application arguments: '%s'", strings.Join(appArgs, ", "))
}
return sparkArgs, appArgs
}

func getValsFromPropertiesFile(path string) map[string]string {
Expand Down Expand Up @@ -509,7 +499,7 @@ func buildSubmitJson(cmd *SparkCommand, marathonConfig map[string]interface{}) (
} else {
client.PrintMessage("Using image '%s' for the driver and the executors (from %s).",
args.properties["spark.mesos.executor.docker.image"], imageSource)
client.PrintMessage("To disable this image on executors, set "+
client.PrintMessage("To disable this image on executors, set " +
"spark.mesos.executor.docker.forcePullImage=false")
args.properties["spark.mesos.executor.docker.forcePullImage"] = "true"
}
Expand Down
Loading