-
Notifications
You must be signed in to change notification settings - Fork 33
[DCOS-38138] Update Spark CLI for shell-escape fix #388
Changes from 41 commits
5150951
764e6c0
746969a
62b542b
a71e59f
9f1dee7
65a0f7c
8d347b9
8336832
db177cd
c5df110
55412bc
5a8e3f4
8297693
93f588a
96ac0a2
3026e88
7c294c7
7b12b34
ff65589
29701c3
376bf1b
f7ad435
6b32e56
c711c6c
34dffbc
d0a230e
1afd177
644c264
f267518
e7035d0
9f86664
0fef2c3
39434da
cfadbf1
9a69880
36faae7
fbc86dc
13bb7f0
7865b6c
0ca555a
d0aae3c
7b2bb6c
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -6,20 +6,30 @@ import ( | |
| "encoding/json" | ||
| "errors" | ||
| "fmt" | ||
| "github.com/mesosphere/dcos-commons/cli/client" | ||
| "github.com/mesosphere/dcos-commons/cli/config" | ||
| "gopkg.in/alecthomas/kingpin.v3-unstable" | ||
| "log" | ||
| "net/url" | ||
| "os" | ||
| "regexp" | ||
| "strconv" | ||
| "strings" | ||
|
|
||
| "github.com/mattn/go-shellwords" | ||
| "github.com/mesosphere/dcos-commons/cli/client" | ||
| "github.com/mesosphere/dcos-commons/cli/config" | ||
| "gopkg.in/alecthomas/kingpin.v3-unstable" | ||
| ) | ||
|
|
||
| var keyWhitespaceValPattern = regexp.MustCompile("(.+)\\s+(.+)") | ||
| var backslashNewlinePattern = regexp.MustCompile("\\s*\\\\s*\\n\\s+") | ||
| var collapseSpacesPattern = regexp.MustCompile(`[\s\p{Zs}]{2,}`) | ||
|
|
||
| // AcceptedSparkAppExtensions lists the accepted extensions for Spark application artifacts | ||
| var AcceptedSparkAppExtensions = []string{ | ||
| ".jar", | ||
| ".py", | ||
| ".R", | ||
| } | ||
|
|
||
| type sparkVal struct { | ||
| flagName string | ||
| propName string | ||
|
|
@@ -146,8 +156,10 @@ Args: | |
| StringVar(&args.mainClass) // note: spark-submit can autodetect, but only for file://local.jar | ||
| submit.Flag("properties-file", "Path to file containing whitespace-separated Spark property defaults."). | ||
| PlaceHolder("PATH").ExistingFileVar(&args.propertiesFile) | ||
| submit.Flag("conf", "Custom Spark configuration properties."). | ||
| PlaceHolder("PROP=VALUE").StringMapVar(&args.properties) | ||
| submit.Flag("conf", "Custom Spark configuration properties. "+ | ||
| "If submitting properties with multiple values, "+ | ||
| "wrap in single quotes e.g. --conf prop='val1 val2'"). | ||
| PlaceHolder("prop=value").StringMapVar(&args.properties) | ||
| submit.Flag("kerberos-principal", "Principal to be used to login to KDC."). | ||
| PlaceHolder("user@REALM").Default("").StringVar(&args.kerberosPrincipal) | ||
| submit.Flag("keytab-secret-path", "Path to Keytab in secret store to be used in the Spark drivers"). | ||
|
|
@@ -280,75 +292,87 @@ func parseApplicationFile(args *sparkArgs) error { | |
| return nil | ||
| } | ||
|
|
||
| func cleanUpSubmitArgs(argsStr string, boolVals []*sparkVal) ([]string, []string) { | ||
|
|
||
| // collapse two or more spaces to one. | ||
| argsCompacted := collapseSpacesPattern.ReplaceAllString(argsStr, " ") | ||
| // we use Kingpin to parse CLI commands and options | ||
| // spark-submit by convention uses '--arg val' while kingpin only supports --arg=val | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Could we create a ticket to address this. Kingpin supports both
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. here you go: https://jira.mesosphere.com/browse/DCOS-41107
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Great! Thanks. |
||
| // cleanUpSubmitArgs transforms the former to the latter | ||
| func transformSubmitArgs(argsStr string, boolVals []*sparkVal) ([]string, []string) { | ||
| // clean up any instances of shell-style escaped newlines: "arg1\\narg2" => "arg1 arg2" | ||
| argsCleaned := strings.TrimSpace(backslashNewlinePattern.ReplaceAllLiteralString(argsCompacted, " ")) | ||
| // HACK: spark-submit uses '--arg val' by convention, while kingpin only supports '--arg=val'. | ||
| // translate the former into the latter for kingpin to parse. | ||
| args := strings.Split(argsCleaned, " ") | ||
| argsEquals := make([]string, 0) | ||
| appFlags := make([]string, 0) | ||
| i := 0 | ||
| ARGLOOP: | ||
| for i < len(args) { | ||
| arg := args[i] | ||
| if !strings.HasPrefix(arg, "-") { | ||
| // looks like we've exited the flags entirely, and are now at the jar and/or args. | ||
| // any arguments without a dash at the front should've been joined to preceding keys. | ||
| // flush the rest and exit. | ||
| for i < len(args) { | ||
| arg = args[i] | ||
| // if we have a --flag going to the application we need to take the arg (flag) and the value ONLY | ||
| // if it's not of the format --flag=val which scopt allows | ||
| if strings.HasPrefix(arg, "-") { | ||
| appFlags = append(appFlags, arg) | ||
| if strings.Contains(arg, "=") || (i+1) >= len(args) { | ||
| i += 1 | ||
| } else { | ||
| // if there's a value with this flag, add it | ||
| if !strings.HasPrefix(args[i+1], "-") { | ||
| appFlags = append(appFlags, args[i+1]) | ||
| i += 1 | ||
| } | ||
| i += 1 | ||
| } | ||
| } else { | ||
| argsEquals = append(argsEquals, arg) | ||
| i += 1 | ||
| argsStr = strings.TrimSpace(backslashNewlinePattern.ReplaceAllLiteralString(argsStr, " ")) | ||
| // collapse two or more spaces to one | ||
| argsStr = collapseSpacesPattern.ReplaceAllString(argsStr, " ") | ||
| // parse argsStr into []string args maintaining shell escaped sequences | ||
| args, err := shellwords.Parse(argsStr) | ||
| if err != nil { | ||
| log.Fatalf("Could not parse string args correctly. Error: %v", err) | ||
| } | ||
| sparkArgs, appArgs := make([]string, 0), make([]string, 0) | ||
| LOOP: | ||
| for i := 0; i < len(args); { | ||
| current := strings.TrimSpace(args[i]) | ||
| switch { | ||
| // The main assumption with --submit-args is that all spark-submit flags come before the spark jar URL | ||
| // if current is a spark jar/app, we've processed all flags | ||
| case isSparkApp(current): | ||
| sparkArgs = append(sparkArgs, args[i]) | ||
| appArgs = append(appArgs, args[i+1:]...) | ||
| break LOOP | ||
| case strings.HasPrefix(current, "--"): | ||
| isBool, skipNext := checkIfBooleanFlag(boolVals, args, i) | ||
| if isBool { | ||
| sparkArgs = append(sparkArgs, current) | ||
| i++ | ||
| if skipNext { | ||
| i++ | ||
| } | ||
| continue LOOP | ||
| } | ||
| break | ||
| } | ||
| // join this arg to the next arg if...: | ||
| // 1. we're not at the last arg in the array | ||
| // 2. we start with "--" | ||
| // 3. we don't already contain "=" (already joined) | ||
| // 4. we aren't a boolean value (no val to join) | ||
| if i < len(args)-1 && strings.HasPrefix(arg, "--") && !strings.Contains(arg, "=") { | ||
| // check for boolean: | ||
| for _, boolVal := range boolVals { | ||
| if boolVal.flagName == arg[2:] { | ||
| argsEquals = append(argsEquals, arg) | ||
| i += 1 | ||
| continue ARGLOOP | ||
| } | ||
| if strings.Contains(current, "=") { | ||
| // already in the form arg=val, no merge required | ||
| sparkArgs = append(sparkArgs, current) | ||
| i++ | ||
| continue LOOP | ||
| } | ||
| // merge this --key against the following val to get --key=val | ||
| argsEquals = append(argsEquals, arg+"="+args[i+1]) | ||
| // otherwise, merge current with next into form arg=val; eg --driver-memory=512m | ||
| next := args[i+1] | ||
| sparkArgs = append(sparkArgs, current+"="+next) | ||
| i += 2 | ||
| } else { | ||
| // already joined or at the end, pass through: | ||
| argsEquals = append(argsEquals, arg) | ||
| i += 1 | ||
| default: | ||
| // if not a flag or jar, current is a continuation of the last arg and should not have been split | ||
| // eg extraJavaOptions="-Dparam1 -Dparam2" was parsed as [extraJavaOptions, -Dparam1, -Dparam2] | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Oh, this isn't handled by shellwords? (It sounded like it might based on the README.)
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. No unfortunately the library splits the string on non-escaped spaces so this |
||
| combined := sparkArgs[len(sparkArgs)-1] + " " + current | ||
| sparkArgs = append(sparkArgs[:len(sparkArgs)-1], combined) | ||
| i++ | ||
| } | ||
| } | ||
| if config.Verbose { | ||
| client.PrintVerbose("Translated spark-submit arguments: '%s'", strings.Join(sparkArgs, ", ")) | ||
| client.PrintVerbose("Translated application arguments: '%s'", strings.Join(appArgs, ", ")) | ||
| } | ||
| return sparkArgs, appArgs | ||
| } | ||
|
|
||
| func isSparkApp(str string) bool { | ||
| for _, ext := range AcceptedSparkAppExtensions { | ||
| if strings.HasSuffix(str, ext) { | ||
| return true | ||
| } | ||
| } | ||
| client.PrintVerbose("Translated spark-submit arguments: '%s'", argsEquals) | ||
| client.PrintVerbose("Translated application arguments: '%s'", appFlags) | ||
| return false | ||
| } | ||
|
|
||
| return argsEquals, appFlags | ||
| func checkIfBooleanFlag(boolVals []*sparkVal, args []string, idx int) (isBool, skipNext bool) { | ||
| // check if current is a boolean flag (eg --supervise) but skip any accompanying boolean values (eg --supervise true) | ||
| current := args[idx] | ||
| for _, boolVal := range boolVals { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Looking at this again this morning. We should break out of this loop as soon as we have found the matching flag -- either by returning, or by calling
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. here you go: d0aae3c |
||
| if boolVal.flagName == current[2:] { | ||
| isBool = true | ||
| next := args[idx+1] | ||
| if _, err := strconv.ParseBool(next); err == nil { | ||
| skipNext = true | ||
|
||
| } | ||
| } | ||
| } | ||
| return | ||
| } | ||
|
|
||
| func getValsFromPropertiesFile(path string) map[string]string { | ||
|
|
@@ -416,7 +440,7 @@ func buildSubmitJson(cmd *SparkCommand, marathonConfig map[string]interface{}) ( | |
| // then map flags | ||
| submit, args := sparkSubmitArgSetup() // setup | ||
| // convert and get application flags, add them to the args passed to the spark app | ||
| submitArgs, appFlags := cleanUpSubmitArgs(cmd.submitArgs, args.boolVals) | ||
| submitArgs, appFlags := transformSubmitArgs(cmd.submitArgs, args.boolVals) | ||
| args.appArgs = append(args.appArgs, appFlags...) | ||
| _, err := submit.Parse(submitArgs) | ||
|
|
||
|
|
@@ -509,7 +533,7 @@ func buildSubmitJson(cmd *SparkCommand, marathonConfig map[string]interface{}) ( | |
| } else { | ||
| client.PrintMessage("Using image '%s' for the driver and the executors (from %s).", | ||
| args.properties["spark.mesos.executor.docker.image"], imageSource) | ||
| client.PrintMessage("To disable this image on executors, set "+ | ||
| client.PrintMessage("To disable this image on executors, set " + | ||
| "spark.mesos.executor.docker.forcePullImage=false") | ||
| args.properties["spark.mesos.executor.docker.forcePullImage"] = "true" | ||
| } | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does this have to be public? Also, we could make the list local to the only function where it is used.
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sorry had a Scala slip of the brain there, forgot caps make go code public. Was aiming to denote it as a constant 🙃
fixed in 7b2bb6c