Skip to content
Prev Previous commit
Next Next commit
Address PR comments
  • Loading branch information
shardulm94 committed Mar 16, 2021
commit bf91027ca580f6da11ac9d91ffe6d11383186844
3 changes: 3 additions & 0 deletions docs/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -772,6 +772,9 @@ Apart from these, the following properties are also available, and may be useful
Useful for allowing Spark to resolve artifacts from behind a firewall e.g. via an in-house
artifact server like Artifactory. Details on the settings file format can be
found at <a href="http://ant.apache.org/ivy/history/latest-milestone/settings.html">Settings Files</a>
<p/>
When running in YARN cluster mode, this file will also be localized to the remote driver for dependency
resolution within <code>SparkContext#addJar</code>
</td>
<td>2.2.0</td>
</tr>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import java.io.{FileSystem => _, _}
import java.net.{InetAddress, UnknownHostException, URI}
import java.nio.ByteBuffer
import java.nio.charset.StandardCharsets
import java.nio.file.Files
import java.util.{Locale, Properties, UUID}
import java.util.zip.{ZipEntry, ZipOutputStream}

Expand All @@ -30,7 +31,6 @@ import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet, ListBuffer, Map}
import scala.util.control.NonFatal

import com.google.common.base.Objects
import com.google.common.io.Files
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs._
import org.apache.hadoop.fs.permission.FsPermission
Expand Down Expand Up @@ -576,7 +576,7 @@ private[spark] class Client(
jarsDir.listFiles().foreach { f =>
if (f.isFile && f.getName.toLowerCase(Locale.ROOT).endsWith(".jar") && f.canRead) {
jarsStream.putNextEntry(new ZipEntry(f.getName))
Files.copy(f, jarsStream)
Files.copy(f.toPath, jarsStream)
jarsStream.closeEntry()
}
}
Expand Down Expand Up @@ -764,7 +764,7 @@ private[spark] class Client(
if url.getProtocol == "file" } {
val file = new File(url.getPath())
confStream.putNextEntry(new ZipEntry(file.getName()))
Files.copy(file, confStream)
Files.copy(file.toPath, confStream)
confStream.closeEntry()
}

Expand All @@ -775,7 +775,7 @@ private[spark] class Client(
hadoopConfFiles.foreach { case (name, file) =>
if (file.canRead()) {
confStream.putNextEntry(new ZipEntry(s"$LOCALIZED_HADOOP_CONF_DIR/$name"))
Files.copy(file, confStream)
Files.copy(file.toPath, confStream)
confStream.closeEntry()
}
}
Expand All @@ -795,20 +795,22 @@ private[spark] class Client(

// Upload user provided ivysettings.xml file to the distributed cache
val ivySettings = sparkConf.getOption("spark.jars.ivySettings")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it would be nice to move this into the configuration package.scala and use ConfigBuilder. Even if we just reference it by the .key option in the SparkSubmitArguments file.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it be okay to handle this as a followup immediately after this PR? There are about 7-8 other places where this string is hardcoded and we can also refactor them out into config package.scala

Copy link
Contributor

@tgravescs tgravescs Mar 22, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that's fine, please file an issue

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if (isClusterMode && ivySettings.isDefined) {
val ivySettingsFile = new File(ivySettings.get)
require(ivySettingsFile.exists(), s"Ivy settings file $ivySettingsFile not found")
require(ivySettingsFile.isFile(),
s"Ivy settings file $ivySettingsFile is not a normal file")
// Generate a file name that can be used for the ivySettings file, that does not conflict
// with any other conf file.
val amIvySettingsFileName = ivySettingsFile.getName() + "-" + UUID.randomUUID().toString
confStream.putNextEntry(new ZipEntry(amIvySettingsFileName))
Files.copy(ivySettingsFile, confStream)
confStream.closeEntry()
ivySettings match {
case Some(ivySettingsPath) if isClusterMode && !Utils.isLocalUri(ivySettingsPath) =>
val ivySettingsFile = new File(ivySettingsPath)
require(ivySettingsFile.exists(), s"Ivy settings file $ivySettingsFile not found")
require(ivySettingsFile.isFile(),
s"Ivy settings file $ivySettingsFile is not a normal file")
// Generate a file name that can be used for the ivySettings file, that does not conflict
// with any other conf file.
val amIvySettingsFileName = ivySettingsFile.getName() + "-" + UUID.randomUUID().toString
confStream.putNextEntry(new ZipEntry(amIvySettingsFileName))
Files.copy(ivySettingsFile.toPath, confStream)
confStream.closeEntry()

// Override the ivySettings file name with the name of the distributed file
props.setProperty("spark.jars.ivySettings", s"$LOCALIZED_CONF_DIR/$amIvySettingsFileName")
// Override the ivySettings file name with the name of the distributed file
props.setProperty("spark.jars.ivySettings", s"$LOCALIZED_CONF_DIR/$amIvySettingsFileName")
case _ => // do nothing
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import scala.concurrent.duration._
import scala.io.Source

import com.google.common.io.{ByteStreams, Files}
import org.apache.commons.io.FileUtils
import org.apache.hadoop.yarn.conf.YarnConfiguration
import org.apache.hadoop.yarn.util.ConverterUtils
import org.scalatest.concurrent.Eventually._
Expand Down Expand Up @@ -373,7 +372,7 @@ class YarnClusterSuite extends BaseYarnClusterSuite {
test("SPARK-34472: ivySettings file should be localized on driver in cluster mode") {

val emptyIvySettings = File.createTempFile("ivy", ".xml")
FileUtils.write(emptyIvySettings, "<ivysettings />", StandardCharsets.UTF_8)
Files.write("<ivysettings />", emptyIvySettings, StandardCharsets.UTF_8)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we use NIO for these? No need for commons-io now that NIO supports this kind of stuff built-in.

val emptyIvySettings = Files.createTempFile(...)
Files.write(emptyIvySettings, Seq(...))

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I changed this to use Guava's Files which is used in many places within this file. Can I create a followup PR to replace these with NIO?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just saw this comment, sounds fine to me.

val result = File.createTempFile("result", null, tempDir)
val finalState = runSpark(clientMode = false,
Expand Down Expand Up @@ -625,7 +624,7 @@ private object YarnAddJarTest extends Logging {
}
if (caught.getMessage.contains("unresolved dependency: org.fake-project.test#test")) {
// "unresolved dependency" is expected as the dependency does not exist
// but exception like "Ivy settings file <file> does not exist should result in failure"
// but exception like "Ivy settings file <file> does not exist" should result in failure
result = "success"
}
} finally {
Expand Down