Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
116 changes: 94 additions & 22 deletions core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ import org.apache.ivy.Ivy
import org.apache.ivy.core.LogOptions
import org.apache.ivy.core.module.descriptor._
import org.apache.ivy.core.module.id.{ArtifactId, ModuleId, ModuleRevisionId}
import org.apache.ivy.core.report.ResolveReport
import org.apache.ivy.core.report.{DownloadStatus, ResolveReport}
import org.apache.ivy.core.resolve.ResolveOptions
import org.apache.ivy.core.retrieve.RetrieveOptions
import org.apache.ivy.core.settings.IvySettings
Expand Down Expand Up @@ -1226,7 +1226,7 @@ private[spark] object SparkSubmitUtils extends Logging {
s"be whitespace. The artifactId provided is: ${splits(1)}")
require(splits(2) != null && splits(2).trim.nonEmpty, s"The version cannot be null or " +
s"be whitespace. The version provided is: ${splits(2)}")
new MavenCoordinate(splits(0), splits(1), splits(2))
MavenCoordinate(splits(0), splits(1), splits(2))
}
}

Expand All @@ -1241,21 +1241,27 @@ private[spark] object SparkSubmitUtils extends Logging {
}

/**
* Extracts maven coordinates from a comma-delimited string
* Create a ChainResolver used by Ivy to search for and resolve dependencies.
*
* @param defaultIvyUserDir The default user path for Ivy
* @param useLocalM2AsCache Whether to use the local maven repo as a cache
* @return A ChainResolver used by Ivy to search for and resolve dependencies.
*/
def createRepoResolvers(defaultIvyUserDir: File): ChainResolver = {
def createRepoResolvers(
defaultIvyUserDir: File,
useLocalM2AsCache: Boolean = true): ChainResolver = {
// We need a chain resolver if we want to check multiple repositories
val cr = new ChainResolver
cr.setName("spark-list")

val localM2 = new IBiblioResolver
localM2.setM2compatible(true)
localM2.setRoot(m2Path.toURI.toString)
localM2.setUsepoms(true)
localM2.setName("local-m2-cache")
cr.add(localM2)
if (useLocalM2AsCache) {
val localM2 = new IBiblioResolver
localM2.setM2compatible(true)
localM2.setRoot(m2Path.toURI.toString)
localM2.setUsepoms(true)
localM2.setName("local-m2-cache")
cr.add(localM2)
}

val localIvy = new FileSystemResolver
val localIvyRoot = new File(defaultIvyUserDir, "local")
Expand Down Expand Up @@ -1351,18 +1357,23 @@ private[spark] object SparkSubmitUtils extends Logging {

/**
* Build Ivy Settings using options with default resolvers
*
* @param remoteRepos Comma-delimited string of remote repositories other than maven central
* @param ivyPath The path to the local ivy repository
* @param useLocalM2AsCache Whether or not use `local-m2 repo` as cache
* @return An IvySettings object
*/
def buildIvySettings(remoteRepos: Option[String], ivyPath: Option[String]): IvySettings = {
def buildIvySettings(
remoteRepos: Option[String],
ivyPath: Option[String],
useLocalM2AsCache: Boolean = true): IvySettings = {
val ivySettings: IvySettings = new IvySettings
processIvyPathArg(ivySettings, ivyPath)

// create a pattern matcher
ivySettings.addMatcher(new GlobPatternMatcher)
// create the dependency resolvers
val repoResolver = createRepoResolvers(ivySettings.getDefaultIvyUserDir)
val repoResolver = createRepoResolvers(ivySettings.getDefaultIvyUserDir, useLocalM2AsCache)
ivySettings.addResolver(repoResolver)
ivySettings.setDefaultResolver(repoResolver.getName)
processRemoteRepoArg(ivySettings, remoteRepos)
Expand Down Expand Up @@ -1459,22 +1470,48 @@ private[spark] object SparkSubmitUtils extends Logging {
*/
private def clearIvyResolutionFiles(
mdId: ModuleRevisionId,
ivySettings: IvySettings,
defaultCacheFile: File,
ivyConfName: String): Unit = {
val currentResolutionFiles = Seq(
s"${mdId.getOrganisation}-${mdId.getName}-$ivyConfName.xml",
s"resolved-${mdId.getOrganisation}-${mdId.getName}-${mdId.getRevision}.xml",
s"resolved-${mdId.getOrganisation}-${mdId.getName}-${mdId.getRevision}.properties"
)
currentResolutionFiles.foreach { filename =>
new File(ivySettings.getDefaultCache, filename).delete()
new File(defaultCacheFile, filename).delete()
}
}

/**
* Clear invalid cache files in ivy. The cache file is usually at
* ~/.ivy2/cache/${groupId}/${artifactId}/ivy-${version}.xml,
* ~/.ivy2/cache/${groupId}/${artifactId}/ivy-${version}.xml.original, and
* ~/.ivy2/cache/${groupId}/${artifactId}/ivydata-${version}.properties.
* Because when using `local-m2` repo as a cache, some invalid files were created.
* If not deleted here, an error prompt similar to `unknown resolver local-m2-cache`
* will be generated, making some confusion for users.
*/
private def clearInvalidIvyCacheFiles(
mdId: ModuleRevisionId,
defaultCacheFile: File): Unit = {
val cacheFiles = Seq(
s"${mdId.getOrganisation}${File.separator}${mdId.getName}${File.separator}" +
s"ivy-${mdId.getRevision}.xml",
s"${mdId.getOrganisation}${File.separator}${mdId.getName}${File.separator}" +
s"ivy-${mdId.getRevision}.xml.original",
s"${mdId.getOrganisation}${File.separator}${mdId.getName}${File.separator}" +
s"ivydata-${mdId.getRevision}.properties")
cacheFiles.foreach { filename =>
new File(defaultCacheFile, filename).delete()
}
}

/**
* Resolves any dependencies that were supplied through maven coordinates
*
* @param coordinates Comma-delimited string of maven coordinates
* @param ivySettings An IvySettings containing resolvers to use
* @param noCacheIvySettings An no-cache(local-m2-cache) IvySettings containing resolvers to use
* @param transitive Whether resolving transitive dependencies, default is true
* @param exclusions Exclusions to apply when resolving transitive dependencies
* @return Seq of path to the jars of the given maven artifacts including their
Expand All @@ -1483,6 +1520,7 @@ private[spark] object SparkSubmitUtils extends Logging {
def resolveMavenCoordinates(
coordinates: String,
ivySettings: IvySettings,
noCacheIvySettings: Option[IvySettings] = None,
transitive: Boolean,
exclusions: Seq[String] = Nil,
isTest: Boolean = false): Seq[String] = {
Expand Down Expand Up @@ -1511,6 +1549,8 @@ private[spark] object SparkSubmitUtils extends Logging {
// scalastyle:on println

val ivy = Ivy.newInstance(ivySettings)
ivy.pushContext()

// Set resolve options to download transitive dependencies as well
val resolveOptions = new ResolveOptions
resolveOptions.setTransitive(transitive)
Expand All @@ -1523,6 +1563,11 @@ private[spark] object SparkSubmitUtils extends Logging {
} else {
resolveOptions.setDownload(true)
}
// retrieve all resolved dependencies
retrieveOptions.setDestArtifactPattern(
packagesDirectory.getAbsolutePath + File.separator +
"[organization]_[artifact]-[revision](-[classifier]).[ext]")
retrieveOptions.setConfs(Array(ivyConfName))

// Add exclusion rules for Spark and Scala Library
addExclusionRules(ivySettings, ivyConfName, md)
Expand All @@ -1534,17 +1579,44 @@ private[spark] object SparkSubmitUtils extends Logging {
// resolve dependencies
val rr: ResolveReport = ivy.resolve(md, resolveOptions)
if (rr.hasError) {
throw new RuntimeException(rr.getAllProblemMessages.toString)
// SPARK-46302: When there are some corrupted jars in the local maven repo,
// we try to continue without the cache
val failedReports = rr.getArtifactsReports(DownloadStatus.FAILED, true)
if (failedReports.nonEmpty && noCacheIvySettings.isDefined) {
val failedArtifacts = failedReports.map(r => r.getArtifact)
logInfo(s"Download failed: ${failedArtifacts.mkString("[", ", ", "]")}, " +
s"attempt to retry while skipping local-m2-cache.")
failedArtifacts.foreach(artifact => {
clearInvalidIvyCacheFiles(artifact.getModuleRevisionId, ivySettings.getDefaultCache)
})
ivy.popContext()

val noCacheIvy = Ivy.newInstance(noCacheIvySettings.get)
noCacheIvy.pushContext()

val noCacheRr = noCacheIvy.resolve(md, resolveOptions)
if (noCacheRr.hasError) {
throw new RuntimeException(noCacheRr.getAllProblemMessages.toString)
}
noCacheIvy.retrieve(noCacheRr.getModuleDescriptor.getModuleRevisionId, retrieveOptions)
val dependencyPaths = resolveDependencyPaths(
noCacheRr.getArtifacts.toArray, packagesDirectory)
noCacheIvy.popContext()

dependencyPaths
} else {
throw new RuntimeException(rr.getAllProblemMessages.toString)
}
} else {
ivy.retrieve(rr.getModuleDescriptor.getModuleRevisionId, retrieveOptions)
val dependencyPaths = resolveDependencyPaths(rr.getArtifacts.toArray, packagesDirectory)
ivy.popContext()

dependencyPaths
}
// retrieve all resolved dependencies
retrieveOptions.setDestArtifactPattern(packagesDirectory.getAbsolutePath + File.separator +
"[organization]_[artifact]-[revision](-[classifier]).[ext]")
ivy.retrieve(rr.getModuleDescriptor.getModuleRevisionId,
retrieveOptions.setConfs(Array(ivyConfName)))
resolveDependencyPaths(rr.getArtifacts.toArray, packagesDirectory)
} finally {
System.setOut(sysOut)
clearIvyResolutionFiles(md.getModuleRevisionId, ivySettings, ivyConfName)
clearIvyResolutionFiles(md.getModuleRevisionId, ivySettings.getDefaultCache, ivyConfName)
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,10 @@ private[hive] object IsolatedClientLoader extends Logging {
SparkSubmitUtils.buildIvySettings(
Some(remoteRepos),
ivyPath),
Some(SparkSubmitUtils.buildIvySettings(
Some(remoteRepos),
ivyPath,
useLocalM2AsCache = false)),
transitive = true,
exclusions = version.exclusions)
}
Expand Down