Skip to content
Next Next commit
[SPARK-34472][CORE] Ship ivySettings file to driver in cluster mode
  • Loading branch information
shardulm94 committed Feb 19, 2021
commit 0d62c5cd023095afd7081355ec2ff1a8558fed7f
28 changes: 24 additions & 4 deletions core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
Original file line number Diff line number Diff line change
Expand Up @@ -300,6 +300,15 @@ private[spark] class SparkSubmit extends Logging {
val isKubernetesClusterModeDriver = isKubernetesClient &&
sparkConf.getBoolean("spark.kubernetes.submitInDriver", false)

// When running in cluster mode, add ivySettings file to files so that the driver can use
// it to resolve ivy packages
if (deployMode == CLUSTER && args.ivySettingsPath.isDefined) {
val ivySettingsFile = new File(args.ivySettingsPath.get)
require(ivySettingsFile.exists(), s"Ivy settings file $ivySettingsFile not found")
require(ivySettingsFile.isFile(), s"Ivy settings file $ivySettingsFile is not a normal file")
args.files = mergeFileLists(args.files, ivySettingsFile.getAbsolutePath)
}

if (!isMesosCluster && !isStandAloneCluster) {
// Resolve maven dependencies if there are any and add classpath to jars. Add them to py-files
// too for packages that include Python code
Expand Down Expand Up @@ -1089,7 +1098,7 @@ object SparkSubmit extends CommandLineUtils with Logging {
}

/** Provides utility functions to be used inside SparkSubmit. */
private[spark] object SparkSubmitUtils {
private[spark] object SparkSubmitUtils extends Logging {

// Exposed for testing
var printStream = SparkSubmit.printStream
Expand Down Expand Up @@ -1278,11 +1287,22 @@ private[spark] object SparkSubmitUtils {
remoteRepos: Option[String],
ivyPath: Option[String]): IvySettings = {
val file = new File(settingsFile)
require(file.exists(), s"Ivy settings file $file does not exist")
require(file.isFile(), s"Ivy settings file $file is not a normal file")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@shardulm94, does it work if you set spark.jars.ivySettings to ./ivysettings.xml and pass ivysettings.xml to spark.yarn.files?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know you might have to copy ivysettings.xml to the current working directory when Spark submit runs but I think it might work for your usecase.

Copy link
Contributor Author

@shardulm94 shardulm94 Feb 23, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, as you pointed out it only works if I have the copy in the current working directory. However we cannot control which directory our users launch spark-submit from. So ideally we would want something which works without user intervention.

What do you think of this? In Yarn Client, we can add the ivySettings file to spark.yarn.dist.files or __spark__conf__.zip and then we can modify the property spark.jars.ivySettings to change it to ./ivysettings.xml or __spark__conf__/ivysettings.xml within Yarn Client.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Adding it into __spark__conf__ sounds okay to me ... but I would prefer to have second opinions from Yarn experts such as @tgravescs, @mridulm or @jerryshao.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried this out, and it looks like it can be handled pretty cleanly this way targeting just YARN. shardulm94@12709f0

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I updated the PR with this approach.

// When running driver in cluster mode, the settingsFile is localized and so needs to be
// accessed using just the file name and not the full file path
val localizedFile = new File(file.getName)
val resolvedFile = if (file.exists()) {
Some(file)
} else if (localizedFile.exists()) {
Some(localizedFile)
} else {
None
}
require(resolvedFile.isDefined, s"Ivy settings file $file does not exist")
require(resolvedFile.get.isFile, s"Ivy settings file $file is not a normal file")

val ivySettings: IvySettings = new IvySettings
try {
ivySettings.load(file)
ivySettings.load(resolvedFile.get)
} catch {
case e @ (_: IOException | _: ParseException) =>
throw new SparkException(s"Failed when loading Ivy settings from $settingsFile", e)
Expand Down
15 changes: 15 additions & 0 deletions core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1525,6 +1525,21 @@ class SparkSubmitSuite
conf.get(k) should be (v)
}
}

test("SPARK-34472: ship ivySettings file to the driver in cluster mode") {
val args = Seq(
"--class", "Foo",
"--master", "yarn",
"--deploy-mode", "cluster",
"--conf", s"spark.jars.ivySettings=${emptyIvySettings.getAbsolutePath}",
"app.jar"
)

val appArgs = new SparkSubmitArguments(args)
val (_, _, conf, _) = submit.prepareSubmitEnvironment(appArgs)

conf.get("spark.yarn.dist.files") should be(s"file://${emptyIvySettings.getAbsolutePath}")
}
}

object SparkSubmitSuite extends SparkFunSuite with TimeLimits {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import scala.concurrent.duration._
import scala.io.Source

import com.google.common.io.{ByteStreams, Files}
import org.apache.commons.io.FileUtils
import org.apache.hadoop.yarn.conf.YarnConfiguration
import org.apache.hadoop.yarn.util.ConverterUtils
import org.scalatest.concurrent.Eventually._
Expand Down Expand Up @@ -368,6 +369,19 @@ class YarnClusterSuite extends BaseYarnClusterSuite {
)
checkResult(finalState, result, "true")
}

test("SPARK-34472: ivySettings file should be localized on driver in cluster mode") {

val emptyIvySettings = File.createTempFile("ivy", ".xml")
FileUtils.write(emptyIvySettings, "<ivysettings />", StandardCharsets.UTF_8)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we use NIO for these? No need for commons-io now that NIO supports this kind of stuff built-in.

val emptyIvySettings = Files.createTempFile(...)
Files.write(emptyIvySettings, Seq(...))

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I changed this to use Guava's Files which is used in many places within this file. Can I create a followup PR to replace these with NIO?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just saw this comment, sounds fine to me.

val result = File.createTempFile("result", null, tempDir)
val finalState = runSpark(clientMode = false,
mainClassName(YarnAddJarTest.getClass),
appArgs = Seq(result.getAbsolutePath),
extraConf = Map("spark.jars.ivySettings" -> emptyIvySettings.getAbsolutePath))
checkResult(finalState, result)
}
}

private[spark] class SaveExecutorInfo extends SparkListener {
Expand Down Expand Up @@ -583,6 +597,47 @@ private object YarnClasspathTest extends Logging {

}

private object YarnAddJarTest extends Logging {
def main(args: Array[String]): Unit = {
if (args.length != 1) {
// scalastyle:off println
System.err.println(
s"""
|Invalid command line: ${args.mkString(" ")}
|
|Usage: YarnAddJarTest [result file]
""".stripMargin)
// scalastyle:on println
System.exit(1)
}

val resultPath = args(0)
val sc = new SparkContext(new SparkConf())

var result = "failure"
try {
val settingsFile = sc.getConf.get("spark.jars.ivySettings")
// Delete the original ivySettings file, so we ensure that the YARN localized file
// is used by the addJar call
// In a real cluster mode, the original settings file at the absolute path won't be present
// on the driver
new File(settingsFile).delete()

val caught = intercept[Exception] {
sc.addJar("ivy://org.fake-project.test:test:1.0.0")
}
if (caught.getMessage.contains("unresolved dependency: org.fake-project.test#test")) {
// "unresolved dependency" is expected as the dependency does not exist
// but exception like "Ivy settings file <file> does not exist should result in failure
result = "success"
}
} finally {
Files.write(result, new File(resultPath), StandardCharsets.UTF_8)
sc.stop()
}
}
}

private object YarnLauncherTestApp {

def main(args: Array[String]): Unit = {
Expand Down