Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -793,6 +793,25 @@ private[spark] class Client(
// distributed file.
amKeytabFileName.foreach { kt => props.setProperty(KEYTAB.key, kt) }

// Upload user provided ivysettings.xml file to the distributed cache
val ivySettings = sparkConf.getOption("spark.jars.ivySettings")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it would be nice to move this into the configuration package.scala and use ConfigBuilder. Even if we just reference it by the .key option in the SparkSubmitArguments file.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it be okay to handle this as a followup immediately after this PR? There are about 7-8 other places where this string is hardcoded and we can also refactor them out into config package.scala

Copy link
Contributor

@tgravescs tgravescs Mar 22, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that's fine, please file an issue

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if (isClusterMode && ivySettings.isDefined) {
val ivySettingsFile = new File(ivySettings.get)
require(ivySettingsFile.exists(), s"Ivy settings file $ivySettingsFile not found")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This would be a little more Scala-idiomatic for unpacking the Option:

      ivySettings match {
        case Some(ivySettingsPath) if isClusterMode =>
          // ... logic here
        case _ => // do nothing
      }

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also check to see if Utils.isLocalUri

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

require(ivySettingsFile.isFile(),
s"Ivy settings file $ivySettingsFile is not a normal file")
// Generate a file name that can be used for the ivySettings file, that does not conflict
// with any other conf file.
val amIvySettingsFileName = ivySettingsFile.getName() + "-" + UUID.randomUUID().toString
confStream.putNextEntry(new ZipEntry(amIvySettingsFileName))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

any reason not to just use distribute() rather then putting into the conf files?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This would help save some RPCs in an HDFS environment, more context in
#31591 (review) See drawback (2)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think also, semantically, this is more similar to a Spark configuration file than a user-provided file and thus makes sense to have within the conf zip as opposed to living with the user JARs/files. This is similar to the approach used for log4j.properties and metrics.properties above.

Copy link
Contributor

@tgravescs tgravescs Mar 22, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I get that its kind of like a conf file but its not, there is no template file in the conf directory for this, it's a user specified config and not automatically picked up, so I would rather keep things consistent and have it treated like all the other configs similar to it.
Also we don't want to support this being in HDFS? I think that comes for free if you just use distribute, I don't see a few rpc calls as being a big deal here especially since it adds functionality. I guess if you allow that you might need more code on the client side to download it. Its a bit unfortunately we are not consistent with these things.

Personally I would prefer us to stay consistent and this act just like other files users can specify. it should allow being in HDFS and it should be distributed via distribute() just like all the other files users can specify. If you disagree please let me know why.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I get that its kind of like a conf file but its not, there is no template file in the conf directory for this, it's a user specified config and not automatically picked up ... Its a bit unfortunately we are not consistent with these things.

The same is (mostly) true for examples like metrics.properties, which is an external/non-Spark file, the only difference being that it is automatically picked up vs. being requested by a user. I don't have much opinion on whether or not this is how it should be -- I agree with you that more clear guidelines/consistency in this area would be nice.

I don't have too much opinion between storing in the conf object and leveraging distribute, I do agree that we should support remote file systems (e.g. HDFS) so this is a good point. @HyukjinKwon do you have any opinion here since you participated in some of the earlier conversations?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have updated the PR using distribute() to localize the ivySettings file instead of putting the file in Spark conf zip. I also had to slightly modify the ivySettings loading code in SparkSubmit.scala since it did not support local URIs in the first place. I also added tests for local and non-local URIs for ivySettings.

Files.copy(ivySettingsFile, confStream)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we use java.nio.file.Files here instead of Guava's Files? I know Guava is already used in this file but I think it's better to leverage the built-in functionality moving forward. You can do a rename import like import java.nio.file.{Files => NioFiles} (or update the other references as well if it's a small change?)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

confStream.closeEntry()

// Override the ivySettings file name with the name of the distributed file
props.setProperty("spark.jars.ivySettings", s"$LOCALIZED_CONF_DIR/$amIvySettingsFileName")
}


writePropertiesToArchive(props, SPARK_CONF_FILE, confStream)

// Write the distributed cache config to the archive.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import scala.concurrent.duration._
import scala.io.Source

import com.google.common.io.{ByteStreams, Files}
import org.apache.commons.io.FileUtils
import org.apache.hadoop.yarn.conf.YarnConfiguration
import org.apache.hadoop.yarn.util.ConverterUtils
import org.scalatest.concurrent.Eventually._
Expand Down Expand Up @@ -368,6 +369,19 @@ class YarnClusterSuite extends BaseYarnClusterSuite {
)
checkResult(finalState, result, "true")
}

test("SPARK-34472: ivySettings file should be localized on driver in cluster mode") {

val emptyIvySettings = File.createTempFile("ivy", ".xml")
FileUtils.write(emptyIvySettings, "<ivysettings />", StandardCharsets.UTF_8)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we use NIO for these? No need for commons-io now that NIO supports this kind of stuff built-in.

val emptyIvySettings = Files.createTempFile(...)
Files.write(emptyIvySettings, Seq(...))

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I changed this to use Guava's Files which is used in many places within this file. Can I create a followup PR to replace these with NIO?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just saw this comment, sounds fine to me.

val result = File.createTempFile("result", null, tempDir)
val finalState = runSpark(clientMode = false,
mainClassName(YarnAddJarTest.getClass),
appArgs = Seq(result.getAbsolutePath),
extraConf = Map("spark.jars.ivySettings" -> emptyIvySettings.getAbsolutePath))
checkResult(finalState, result)
}
}

private[spark] class SaveExecutorInfo extends SparkListener {
Expand Down Expand Up @@ -583,6 +597,44 @@ private object YarnClasspathTest extends Logging {

}

private object YarnAddJarTest extends Logging {
def main(args: Array[String]): Unit = {
if (args.length != 1) {
// scalastyle:off println
System.err.println(
s"""
|Invalid command line: ${args.mkString(" ")}
|
|Usage: YarnAddJarTest [result file]
""".stripMargin)
// scalastyle:on println
System.exit(1)
}

val resultPath = args(0)
val sc = new SparkContext(new SparkConf())

var result = "failure"
try {
val settingsFile = sc.getConf.get("spark.jars.ivySettings")
// Make sure that ivySettings conf was set to the localized file
assert(settingsFile.startsWith(Client.LOCALIZED_CONF_DIR))

val caught = intercept[RuntimeException] {
sc.addJar("ivy://org.fake-project.test:test:1.0.0")
}
if (caught.getMessage.contains("unresolved dependency: org.fake-project.test#test")) {
// "unresolved dependency" is expected as the dependency does not exist
// but exception like "Ivy settings file <file> does not exist should result in failure"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor nit: end quote is in the wrong place

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed

result = "success"
}
} finally {
Files.write(result, new File(resultPath), StandardCharsets.UTF_8)
sc.stop()
}
}
}

private object YarnLauncherTestApp {

def main(args: Array[String]): Unit = {
Expand Down