Skip to content
This repository was archived by the owner on Dec 4, 2024. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cli/dcos-spark/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ func handleCommands(app *kingpin.Application) {

run := app.Command("run", "Submit a job to the Spark Mesos Dispatcher").Action(cmd.runSubmit)
run.Flag("submit-args", fmt.Sprintf("Arguments matching what would be sent to 'spark-submit': %s",
sparkSubmitHelp())).Required().PlaceHolder("ARGS").StringVar(&cmd.submitArgs)
sparkSubmitHelp())).Required().PlaceHolder("\"ARGS\"").StringVar(&cmd.submitArgs)
// TODO this should be moved to submit args
run.Flag("docker-image", "Docker image to run the job within").
Default("").
Expand Down
149 changes: 81 additions & 68 deletions cli/dcos-spark/submit_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,16 @@ import (
"encoding/json"
"errors"
"fmt"
"github.com/mesosphere/dcos-commons/cli/client"
"github.com/mesosphere/dcos-commons/cli/config"
"gopkg.in/alecthomas/kingpin.v3-unstable"
"log"
"net/url"
"os"
"regexp"
"strings"

"github.com/mattn/go-shellwords"
"github.com/mesosphere/dcos-commons/cli/client"
"github.com/mesosphere/dcos-commons/cli/config"
"gopkg.in/alecthomas/kingpin.v3-unstable"
)

var keyWhitespaceValPattern = regexp.MustCompile("(.+)\\s+(.+)")
Expand Down Expand Up @@ -146,8 +148,10 @@ Args:
StringVar(&args.mainClass) // note: spark-submit can autodetect, but only for file://local.jar
submit.Flag("properties-file", "Path to file containing whitespace-separated Spark property defaults.").
PlaceHolder("PATH").ExistingFileVar(&args.propertiesFile)
submit.Flag("conf", "Custom Spark configuration properties.").
PlaceHolder("PROP=VALUE").StringMapVar(&args.properties)
submit.Flag("conf", "Custom Spark configuration properties. "+
"If submitting properties with multiple values, "+
"wrap in single quotes e.g. --conf prop='val1 val2'").
PlaceHolder("prop=value").StringMapVar(&args.properties)
submit.Flag("kerberos-principal", "Principal to be used to login to KDC.").
PlaceHolder("user@REALM").Default("").StringVar(&args.kerberosPrincipal)
submit.Flag("keytab-secret-path", "Path to Keytab in secret store to be used in the Spark drivers").
Expand Down Expand Up @@ -280,75 +284,84 @@ func parseApplicationFile(args *sparkArgs) error {
return nil
}

func cleanUpSubmitArgs(argsStr string, boolVals []*sparkVal) ([]string, []string) {

// collapse two or more spaces to one.
argsCompacted := collapseSpacesPattern.ReplaceAllString(argsStr, " ")
// we use Kingpin to parse CLI commands and options
// spark-submit by convention uses '--arg val' while kingpin only supports --arg=val
// transformSubmitArgs turns the former into the latter
func transformSubmitArgs(argsStr string, boolVals []*sparkVal) ([]string, []string) {
// clean up any instances of shell-style escaped newlines: "arg1\\narg2" => "arg1 arg2"
argsCleaned := strings.TrimSpace(backslashNewlinePattern.ReplaceAllLiteralString(argsCompacted, " "))
// HACK: spark-submit uses '--arg val' by convention, while kingpin only supports '--arg=val'.
// translate the former into the latter for kingpin to parse.
args := strings.Split(argsCleaned, " ")
argsEquals := make([]string, 0)
appFlags := make([]string, 0)
i := 0
ARGLOOP:
for i < len(args) {
arg := args[i]
if !strings.HasPrefix(arg, "-") {
// looks like we've exited the flags entirely, and are now at the jar and/or args.
// any arguments without a dash at the front should've been joined to preceding keys.
// flush the rest and exit.
for i < len(args) {
arg = args[i]
// if we have a --flag going to the application we need to take the arg (flag) and the value ONLY
// if it's not of the format --flag=val which scopt allows
if strings.HasPrefix(arg, "-") {
appFlags = append(appFlags, arg)
if strings.Contains(arg, "=") || (i+1) >= len(args) {
i += 1
} else {
// if there's a value with this flag, add it
if !strings.HasPrefix(args[i+1], "-") {
appFlags = append(appFlags, args[i+1])
i += 1
}
i += 1
}
} else {
argsEquals = append(argsEquals, arg)
i += 1
}
argsStr = strings.TrimSpace(backslashNewlinePattern.ReplaceAllLiteralString(argsStr, " "))
// collapse two or more spaces to one
argsStr = collapseSpacesPattern.ReplaceAllString(argsStr, " ")
// parse argsStr into []string args maintaining shell escaped sequences
args, err := shellwords.Parse(argsStr)
if err != nil {
log.Fatalf("Could not parse string args correctly. Error: %v", err)
}
sparkArgs, appArgs := make([]string, 0), make([]string, 0)
LOOP:
for i := 0; i < len(args); {
current := strings.TrimSpace(args[i])
switch {
// The main assumption with --submit-args is that all spark-submit flags come before the spark jar URL
// if current is a spark jar/app, we've processed all flags
case isSparkApp(current):
sparkArgs = append(sparkArgs, args[i])
appArgs = append(appArgs, args[i+1:]...)
break LOOP
case strings.HasPrefix(current, "--"):
if isBoolFlag(boolVals, current) {
sparkArgs = append(sparkArgs, current)
i++
continue LOOP
}
break
}
// join this arg to the next arg if...:
// 1. we're not at the last arg in the array
// 2. we start with "--"
// 3. we don't already contain "=" (already joined)
// 4. we aren't a boolean value (no val to join)
if i < len(args)-1 && strings.HasPrefix(arg, "--") && !strings.Contains(arg, "=") {
// check for boolean:
for _, boolVal := range boolVals {
if boolVal.flagName == arg[2:] {
argsEquals = append(argsEquals, arg)
i += 1
continue ARGLOOP
}
if strings.Contains(current, "=") {
// already in the form arg=val, no merge required
sparkArgs = append(sparkArgs, current)
i++
continue LOOP
}
// merge this --key against the following val to get --key=val
argsEquals = append(argsEquals, arg+"="+args[i+1])
// otherwise, merge current with next into form arg=val; eg --driver-memory=512m
next := args[i+1]
sparkArgs = append(sparkArgs, current+"="+next)
i += 2
} else {
// already joined or at the end, pass through:
argsEquals = append(argsEquals, arg)
i += 1
default:
// if not a flag or jar, current is a continuation of the last arg and should not have been split
// eg extraJavaOptions="-Dparam1 -Dparam2" was parsed as [extraJavaOptions, -Dparam1, -Dparam2]
combined := sparkArgs[len(sparkArgs)-1] + " " + current
sparkArgs = append(sparkArgs[:len(sparkArgs)-1], combined)
i++
}
}
client.PrintVerbose("Translated spark-submit arguments: '%s'", argsEquals)
client.PrintVerbose("Translated application arguments: '%s'", appFlags)
if config.Verbose {
client.PrintVerbose("Translated spark-submit arguments: '%s'", strings.Join(sparkArgs, ", "))
client.PrintVerbose("Translated application arguments: '%s'", strings.Join(appArgs, ", "))
}
return sparkArgs, appArgs
}

return argsEquals, appFlags
var acceptedSparkAppExtensions = []string{
".jar",
".py",
".R",
}

func isSparkApp(str string) bool {
for _, ext := range acceptedSparkAppExtensions {
if strings.HasSuffix(str, ext) {
return true
}
}
return false
}

// check if string is a boolean flag (eg --supervise)
func isBoolFlag(boolVals []*sparkVal, str string) bool {
for _, boolVal := range boolVals {
if boolVal.flagName == str[2:] {
return true
}
}
return false
}

func getValsFromPropertiesFile(path string) map[string]string {
Expand Down Expand Up @@ -416,7 +429,7 @@ func buildSubmitJson(cmd *SparkCommand, marathonConfig map[string]interface{}) (
// then map flags
submit, args := sparkSubmitArgSetup() // setup
// convert and get application flags, add them to the args passed to the spark app
submitArgs, appFlags := cleanUpSubmitArgs(cmd.submitArgs, args.boolVals)
submitArgs, appFlags := transformSubmitArgs(cmd.submitArgs, args.boolVals)
args.appArgs = append(args.appArgs, appFlags...)
_, err := submit.Parse(submitArgs)

Expand Down
53 changes: 49 additions & 4 deletions cli/dcos-spark/submit_builder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package main
import (
"encoding/json"
"fmt"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/suite"
"os"
"testing"
Expand Down Expand Up @@ -37,10 +38,10 @@ func TestCliTestSuite(t *testing.T) {
}

// test spaces
func (suite *CliTestSuite) TestCleanUpSubmitArgs() {
func (suite *CliTestSuite) TestTransformSubmitArgs() {
_, args := sparkSubmitArgSetup()
inputArgs := "--conf spark.app.name=kerberosStreaming --conf spark.cores.max=8"
submitArgs, _ := cleanUpSubmitArgs(inputArgs, args.boolVals)
inputArgs := "--conf spark.app.name=kerberosStreaming --conf spark.cores.max=8 main.jar 100"
submitArgs, _ := transformSubmitArgs(inputArgs, args.boolVals)
if "--conf=spark.app.name=kerberosStreaming" != submitArgs[0] {
suite.T().Errorf("Failed to reduce spaces while cleaning submit args.")
}
Expand All @@ -50,12 +51,54 @@ func (suite *CliTestSuite) TestCleanUpSubmitArgs() {
}
}

func (suite *CliTestSuite) TestTransformSubmitArgsWithMulitpleValues() {
_, args := sparkSubmitArgSetup()
inputArgs := "--conf spark.driver.extraJavaOptions='-XX:+PrintGC -Dparam1=val1 -Dparam2=val2' main.py 100"
expected := "--conf=spark.driver.extraJavaOptions=-XX:+PrintGC -Dparam1=val1 -Dparam2=val2"
actual, _ := transformSubmitArgs(inputArgs, args.boolVals)
assert.Equal(suite.T(), expected, actual[0])
}

func (suite *CliTestSuite) TestTransformSubmitArgsWithSpecialCharacters() {
_, args := sparkSubmitArgSetup()
inputArgs := "--conf spark.driver.extraJavaOptions='-Dparam1=\"val 1?\" -Dparam2=val\\ 2! -Dmulti.dot.param3=\"val\\ 3\" -Dpath=$PATH' main.py 100"
expected := "--conf=spark.driver.extraJavaOptions=-Dparam1=\"val 1?\" -Dparam2=val\\ 2! -Dmulti.dot.param3=\"val\\ 3\" -Dpath=$PATH"
actual, _ := transformSubmitArgs(inputArgs, args.boolVals)
assert.Equal(suite.T(), expected, actual[0])
}

func (suite *CliTestSuite) TestTransformSubmitArgsConfsAlreadyHasEquals() {
_, args := sparkSubmitArgSetup()
inputArgs := "--conf=spark.driver.extraJavaOptions='-Dparam1=val1' main.py 100"
expected := "--conf=spark.driver.extraJavaOptions=-Dparam1=val1"
actual, _ := transformSubmitArgs(inputArgs, args.boolVals)
assert.Equal(suite.T(), expected, actual[0])
}

func (suite *CliTestSuite) TestTransformSubmitArgsMultilines() {
_, args := sparkSubmitArgSetup()
inputArgs := `--conf spark.driver.extraJavaOptions='-XX:+PrintGC -XX:+PrintGCTimeStamps' \
--supervise --driver-memory 1g \
main.py 100`
expected := []string{"--conf=spark.driver.extraJavaOptions=-XX:+PrintGC -XX:+PrintGCTimeStamps", "--supervise", "--driver-memory=1g", "main.py"}
actual, _ := transformSubmitArgs(inputArgs, args.boolVals)
assert.Equal(suite.T(), expected, actual)
}

func (suite *CliTestSuite) TestIsSparkApp() {
assert.True(suite.T(), isSparkApp("mainApp.jar"))
assert.True(suite.T(), isSparkApp("pythonFile.py"))
assert.True(suite.T(), isSparkApp("file.R"))
assert.False(suite.T(), isSparkApp("app.c"))
assert.False(suite.T(), isSparkApp("randomFlag"))
}

// test scopts pattern for app args when have full submit args
func (suite *CliTestSuite) TestScoptAppArgs() {
_, args := sparkSubmitArgSetup()
inputArgs := `--driver-cores 1 --conf spark.cores.max=1 --driver-memory 512M
--class org.apache.spark.examples.SparkPi http://spark-example.jar --input1 value1 --input2 value2`
submitArgs, appFlags := cleanUpSubmitArgs(inputArgs, args.boolVals)
submitArgs, appFlags := transformSubmitArgs(inputArgs, args.boolVals)

if "--input1" != appFlags[0] {
suite.T().Errorf("Failed to parse app args.")
Expand Down Expand Up @@ -91,6 +134,7 @@ func (suite *CliTestSuite) TestPayloadSimple() {
"--driver-cores %s "+
"--conf spark.cores.max=%s "+
"--driver-memory %s "+
"--conf spark.driver.extraJavaOptions='-XX:+PrintGC -Dparam1=val1 -Dparam2=val2' "+
"--class %s "+
"%s --input1 value1 --input2 value2", driverCores, maxCores, driverMemory, mainClass, appJar)

Expand Down Expand Up @@ -124,6 +168,7 @@ func (suite *CliTestSuite) TestPayloadSimple() {
"spark.submit.deployMode": "cluster",
"spark.mesos.driver.labels": fmt.Sprintf("DCOS_SPACE:%s", marathonAppId),
"spark.driver.memory": driverMemory,
"spark.driver.extraJavaOptions": "-XX:+PrintGC -Dparam1=val1 -Dparam2=val2",
"spark.jars": appJar,
}

Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading