Skip to content
This repository was archived by the owner on Dec 4, 2024. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
43 commits
Select commit Hold shift + click to select a range
5150951
WIP add tests and update manifest with spark.tgz
samvantran Jul 26, 2018
764e6c0
WIP fix cleanUpSubmitArgs to handle special chars and multiargs
samvantran Jul 26, 2018
746969a
Cleanup
samvantran Jul 26, 2018
62b542b
Add vendor pkg go-shellwords
samvantran Jul 26, 2018
a71e59f
Fix url
samvantran Jul 26, 2018
9f1dee7
More cleanup + gofmt
samvantran Jul 27, 2018
65a0f7c
Fix single quote error
samvantran Jul 27, 2018
8d347b9
Fix descrip
samvantran Jul 27, 2018
8336832
More fixes and tests
samvantran Jul 27, 2018
db177cd
Debug why single quote fails via spark run
samvantran Jul 29, 2018
c5df110
Fixes and cleanup
samvantran Jul 29, 2018
55412bc
gofmt
samvantran Jul 29, 2018
5a8e3f4
Comment out test, need to create app to print out options
samvantran Jul 30, 2018
8297693
Add simple app + test for CI
samvantran Jul 31, 2018
93f588a
Cleanup and fix test
samvantran Aug 1, 2018
96ac0a2
Fixes
samvantran Aug 1, 2018
3026e88
Cleanup test cases
samvantran Aug 2, 2018
7c294c7
Address PR comments
samvantran Aug 2, 2018
7b12b34
Fix expected test output
samvantran Aug 2, 2018
ff65589
Write confs to tempfile
samvantran Aug 7, 2018
29701c3
Forgot arg in parent function
samvantran Aug 7, 2018
376bf1b
Let's try escaping the quotes
samvantran Aug 8, 2018
f7ad435
Alternatively, wrap entire fn in file
samvantran Aug 8, 2018
6b32e56
Add function isSparkApp
samvantran Aug 14, 2018
c711c6c
Print out all system.properties in app
samvantran Aug 14, 2018
34dffbc
Run the actual file in test
samvantran Aug 14, 2018
d0a230e
Add run perms to tempfile
samvantran Aug 14, 2018
1afd177
Octals are different in python3
samvantran Aug 15, 2018
644c264
Subprocess.run needs shell=True
samvantran Aug 15, 2018
f267518
Sleep right after chmod (potentially old Docker bug)
samvantran Aug 15, 2018
e7035d0
Holy bejesus it finally works
samvantran Aug 16, 2018
9f86664
Cleanup, move logic to test_spark and revert spark_utils
samvantran Aug 16, 2018
0fef2c3
Simplify test_multi_arg_confs
samvantran Aug 17, 2018
39434da
Address PR comments
samvantran Aug 20, 2018
cfadbf1
Cleanup
samvantran Aug 21, 2018
9a69880
Oops, too hasty with the revert
samvantran Aug 21, 2018
36faae7
Merge branch 'master' into DCOS-38138-shell-escape
samvantran Aug 24, 2018
fbc86dc
Use spark distro 2.6.5 created from default
samvantran Aug 24, 2018
13bb7f0
Resync test.sh from dcos-commons: use DOCKER_IMAGE envvar
Aug 25, 2018
7865b6c
Skip test_jar test
samvantran Aug 28, 2018
0ca555a
Merge branch 'master' into DCOS-38138-shell-escape
samvantran Aug 28, 2018
d0aae3c
Remove checking for bool values
samvantran Aug 29, 2018
7b2bb6c
Move app extensions closer to method
samvantran Aug 30, 2018
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 7 additions & 7 deletions cli/dcos-spark/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,22 +209,22 @@ func (cmd *SparkCommand) runQuotaCreate(a *kingpin.Application, e *kingpin.Parse
requestValues := make([]quotaCreateGuarantee, 0)
if cmd.quotaCpus != 0 {
requestValues = append(requestValues, quotaCreateGuarantee{
Name: "cpus",
Type: "SCALAR",
Name: "cpus",
Type: "SCALAR",
Scalar: quotaScalar{Value: cmd.quotaCpus},
})
}
if cmd.quotaMem != 0 {
requestValues = append(requestValues, quotaCreateGuarantee{
Name: "mem",
Type: "SCALAR",
Name: "mem",
Type: "SCALAR",
Scalar: quotaScalar{Value: cmd.quotaMem},
})
}
if cmd.quotaGpus != 0 {
requestValues = append(requestValues, quotaCreateGuarantee{
Name: "gpus",
Type: "SCALAR",
Name: "gpus",
Type: "SCALAR",
Scalar: quotaScalar{Value: float64(cmd.quotaGpus)},
})
}
Expand Down Expand Up @@ -311,7 +311,7 @@ func handleCommands(app *kingpin.Application) {

run := app.Command("run", "Submit a job to the Spark Mesos Dispatcher").Action(cmd.runSubmit)
run.Flag("submit-args", fmt.Sprintf("Arguments matching what would be sent to 'spark-submit': %s",
sparkSubmitHelp())).Required().PlaceHolder("ARGS").StringVar(&cmd.submitArgs)
sparkSubmitHelp())).Required().PlaceHolder("\"ARGS\"").StringVar(&cmd.submitArgs)
// TODO this should be moved to submit args
run.Flag("docker-image", "Docker image to run the job within").
Default("").
Expand Down
20 changes: 9 additions & 11 deletions cli/dcos-spark/security.go
Original file line number Diff line number Diff line change
@@ -1,31 +1,30 @@
package main

import (
"fmt"
"errors"
"crypto/rand"
"encoding/base64"
"errors"
"fmt"
"github.com/mesosphere/dcos-commons/cli/client"
"strings"
)


const KEYLENGTH = 128

var TASK_TYPES = []string{"driver", "executor"}

var SECRET_REFERENCE_PROPERTIES = map[string]string{
"driver": "spark.mesos.driver.secret.names",
"driver": "spark.mesos.driver.secret.names",
"executor": "spark.mesos.executor.secret.names",
}

var SECRET_FILENAME_PROPERTIES = map[string]string{
"driver": "spark.mesos.driver.secret.filenames",
"driver": "spark.mesos.driver.secret.filenames",
"executor": "spark.mesos.executor.secret.filenames",
}

var SECRET_ENVKEY_PROPERTIES = map[string]string{
"driver": "spark.mesos.driver.secret.envkeys",
"driver": "spark.mesos.driver.secret.envkeys",
"executor": "spark.mesos.executor.secret.envkeys",
}

Expand All @@ -34,7 +33,6 @@ const SPARK_KDC_PORT_KEY = "SPARK_SECURITY_KERBEROS_KDC_PORT"
const SPARK_KERBEROS_REALM_KEY = "SPARK_SECURITY_KERBEROS_REALM"
const SPARK_KERBEROS_KRB5_BLOB = "SPARK_MESOS_KRB5_CONF_BASE64"


// utility function used by SASL and Kerberos for user-defined secrets that may be base64 encoded blobs
// basically removes the prefix while ignoring the secret directory structure
func prepareBase64Secret(secretPath string) string {
Expand All @@ -44,7 +42,7 @@ func prepareBase64Secret(secretPath string) string {
}

absoluteSecretPath := strings.Split(secretPath, "/")
filename := absoluteSecretPath[len(absoluteSecretPath) - 1]
filename := absoluteSecretPath[len(absoluteSecretPath)-1]
// secrets with __dcos_base64__ will be decoded by Mesos, but remove the prefix here
if strings.HasPrefix(filename, "__dcos_base64__") {
return strings.TrimPrefix(filename, "__dcos_base64__")
Expand Down Expand Up @@ -92,7 +90,7 @@ func setupTLSArgs(args *sparkArgs) {
filenames := []string{keyStoreFileName}
envkeys := []string{"DCOS_SPARK_KEYSTORE"}

if args.truststoreSecretPath != "" { // optionally add the truststore configs also
if args.truststoreSecretPath != "" { // optionally add the truststore configs also
addPropertyAndWarn(args, "spark.ssl.trustStore", trustStoreFileName)
setPropertyToDefaultIfNotSet(args, "spark.ssl.trustStorePassword", args.truststorePassword)
paths = append(paths, args.truststoreSecretPath)
Expand Down Expand Up @@ -258,8 +256,8 @@ func forwardEnvironmentVariablesFromMarathonConfig(args *sparkArgs, marathonJson

if kdcPropCount > 0 && kdcPropCount != 3 {
client.PrintMessage(
"WARNING: Missing some of the 3 dispatcher environment variables (%s, %s, %s) " +
"required for templating krb5.conf",
"WARNING: Missing some of the 3 dispatcher environment variables (%s, %s, %s) "+
"required for templating krb5.conf",
SPARK_KDC_HOSTNAME_KEY, SPARK_KDC_PORT_KEY, SPARK_KERBEROS_REALM_KEY)
}

Expand Down
151 changes: 82 additions & 69 deletions cli/dcos-spark/submit_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,16 @@ import (
"encoding/json"
"errors"
"fmt"
"github.com/mesosphere/dcos-commons/cli/client"
"github.com/mesosphere/dcos-commons/cli/config"
"gopkg.in/alecthomas/kingpin.v3-unstable"
"log"
"net/url"
"os"
"regexp"
"strings"

"github.com/mattn/go-shellwords"
"github.com/mesosphere/dcos-commons/cli/client"
"github.com/mesosphere/dcos-commons/cli/config"
"gopkg.in/alecthomas/kingpin.v3-unstable"
)

var keyWhitespaceValPattern = regexp.MustCompile("(.+)\\s+(.+)")
Expand Down Expand Up @@ -146,8 +148,10 @@ Args:
StringVar(&args.mainClass) // note: spark-submit can autodetect, but only for file://local.jar
submit.Flag("properties-file", "Path to file containing whitespace-separated Spark property defaults.").
PlaceHolder("PATH").ExistingFileVar(&args.propertiesFile)
submit.Flag("conf", "Custom Spark configuration properties.").
PlaceHolder("PROP=VALUE").StringMapVar(&args.properties)
submit.Flag("conf", "Custom Spark configuration properties. "+
"If submitting properties with multiple values, "+
"wrap in single quotes e.g. --conf prop='val1 val2'").
PlaceHolder("prop=value").StringMapVar(&args.properties)
submit.Flag("kerberos-principal", "Principal to be used to login to KDC.").
PlaceHolder("user@REALM").Default("").StringVar(&args.kerberosPrincipal)
submit.Flag("keytab-secret-path", "Path to Keytab in secret store to be used in the Spark drivers").
Expand Down Expand Up @@ -280,75 +284,84 @@ func parseApplicationFile(args *sparkArgs) error {
return nil
}

func cleanUpSubmitArgs(argsStr string, boolVals []*sparkVal) ([]string, []string) {

// collapse two or more spaces to one.
argsCompacted := collapseSpacesPattern.ReplaceAllString(argsStr, " ")
// we use Kingpin to parse CLI commands and options
// spark-submit by convention uses '--arg val' while kingpin only supports --arg=val
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we create a ticket to address this. Kingpin supports both --arg val and --arg=val.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great! Thanks.

// transformSubmitArgs turns the former into the latter
func transformSubmitArgs(argsStr string, boolVals []*sparkVal) ([]string, []string) {
// clean up any instances of shell-style escaped newlines: "arg1\\narg2" => "arg1 arg2"
argsCleaned := strings.TrimSpace(backslashNewlinePattern.ReplaceAllLiteralString(argsCompacted, " "))
// HACK: spark-submit uses '--arg val' by convention, while kingpin only supports '--arg=val'.
// translate the former into the latter for kingpin to parse.
args := strings.Split(argsCleaned, " ")
argsEquals := make([]string, 0)
appFlags := make([]string, 0)
i := 0
ARGLOOP:
for i < len(args) {
arg := args[i]
if !strings.HasPrefix(arg, "-") {
// looks like we've exited the flags entirely, and are now at the jar and/or args.
// any arguments without a dash at the front should've been joined to preceding keys.
// flush the rest and exit.
for i < len(args) {
arg = args[i]
// if we have a --flag going to the application we need to take the arg (flag) and the value ONLY
// if it's not of the format --flag=val which scopt allows
if strings.HasPrefix(arg, "-") {
appFlags = append(appFlags, arg)
if strings.Contains(arg, "=") || (i+1) >= len(args) {
i += 1
} else {
// if there's a value with this flag, add it
if !strings.HasPrefix(args[i+1], "-") {
appFlags = append(appFlags, args[i+1])
i += 1
}
i += 1
}
} else {
argsEquals = append(argsEquals, arg)
i += 1
}
argsStr = strings.TrimSpace(backslashNewlinePattern.ReplaceAllLiteralString(argsStr, " "))
// collapse two or more spaces to one
argsStr = collapseSpacesPattern.ReplaceAllString(argsStr, " ")
// parse argsStr into []string args maintaining shell escaped sequences
args, err := shellwords.Parse(argsStr)
if err != nil {
log.Fatalf("Could not parse string args correctly. Error: %v", err)
}
sparkArgs, appArgs := make([]string, 0), make([]string, 0)
LOOP:
for i := 0; i < len(args); {
current := strings.TrimSpace(args[i])
switch {
// The main assumption with --submit-args is that all spark-submit flags come before the spark jar URL
// if current is a spark jar/app, we've processed all flags
case isSparkApp(current):
sparkArgs = append(sparkArgs, args[i])
appArgs = append(appArgs, args[i+1:]...)
break LOOP
case strings.HasPrefix(current, "--"):
if isBoolFlag(boolVals, current) {
sparkArgs = append(sparkArgs, current)
i++
continue LOOP
}
break
}
// join this arg to the next arg if...:
// 1. we're not at the last arg in the array
// 2. we start with "--"
// 3. we don't already contain "=" (already joined)
// 4. we aren't a boolean value (no val to join)
if i < len(args)-1 && strings.HasPrefix(arg, "--") && !strings.Contains(arg, "=") {
// check for boolean:
for _, boolVal := range boolVals {
if boolVal.flagName == arg[2:] {
argsEquals = append(argsEquals, arg)
i += 1
continue ARGLOOP
}
if strings.Contains(current, "=") {
// already in the form arg=val, no merge required
sparkArgs = append(sparkArgs, current)
i++
continue LOOP
}
// merge this --key against the following val to get --key=val
argsEquals = append(argsEquals, arg+"="+args[i+1])
// otherwise, merge current with next into form arg=val; eg --driver-memory=512m
next := args[i+1]
sparkArgs = append(sparkArgs, current+"="+next)
i += 2
} else {
// already joined or at the end, pass through:
argsEquals = append(argsEquals, arg)
i += 1
default:
// if not a flag or jar, current is a continuation of the last arg and should not have been split
// eg extraJavaOptions="-Dparam1 -Dparam2" was parsed as [extraJavaOptions, -Dparam1, -Dparam2]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, this isn't handled by shellwords? (It sounded like it might based on the README.)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No unfortunately the library splits the string on non-escaped spaces so this default case glues the string back together.

combined := sparkArgs[len(sparkArgs)-1] + " " + current
sparkArgs = append(sparkArgs[:len(sparkArgs)-1], combined)
i++
}
}
client.PrintVerbose("Translated spark-submit arguments: '%s'", argsEquals)
client.PrintVerbose("Translated application arguments: '%s'", appFlags)
if config.Verbose {
client.PrintVerbose("Translated spark-submit arguments: '%s'", strings.Join(sparkArgs, ", "))
client.PrintVerbose("Translated application arguments: '%s'", strings.Join(appArgs, ", "))
}
return sparkArgs, appArgs
}

return argsEquals, appFlags
var acceptedSparkAppExtensions = []string{
".jar",
".py",
".R",
}

func isSparkApp(str string) bool {
for _, ext := range acceptedSparkAppExtensions {
if strings.HasSuffix(str, ext) {
return true
}
}
return false
}

// check if string is a boolean flag (eg --supervise)
func isBoolFlag(boolVals []*sparkVal, str string) bool {
for _, boolVal := range boolVals {
if boolVal.flagName == str[2:] {
return true
}
}
return false
}

func getValsFromPropertiesFile(path string) map[string]string {
Expand Down Expand Up @@ -416,7 +429,7 @@ func buildSubmitJson(cmd *SparkCommand, marathonConfig map[string]interface{}) (
// then map flags
submit, args := sparkSubmitArgSetup() // setup
// convert and get application flags, add them to the args passed to the spark app
submitArgs, appFlags := cleanUpSubmitArgs(cmd.submitArgs, args.boolVals)
submitArgs, appFlags := transformSubmitArgs(cmd.submitArgs, args.boolVals)
args.appArgs = append(args.appArgs, appFlags...)
_, err := submit.Parse(submitArgs)

Expand Down Expand Up @@ -509,7 +522,7 @@ func buildSubmitJson(cmd *SparkCommand, marathonConfig map[string]interface{}) (
} else {
client.PrintMessage("Using image '%s' for the driver and the executors (from %s).",
args.properties["spark.mesos.executor.docker.image"], imageSource)
client.PrintMessage("To disable this image on executors, set "+
client.PrintMessage("To disable this image on executors, set " +
"spark.mesos.executor.docker.forcePullImage=false")
args.properties["spark.mesos.executor.docker.forcePullImage"] = "true"
}
Expand Down
Loading