diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala index af35f451e370..0f0d8b6c07c0 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala @@ -41,7 +41,7 @@ import org.apache.ivy.Ivy import org.apache.ivy.core.LogOptions import org.apache.ivy.core.module.descriptor._ import org.apache.ivy.core.module.id.{ArtifactId, ModuleId, ModuleRevisionId} -import org.apache.ivy.core.report.ResolveReport +import org.apache.ivy.core.report.{DownloadStatus, ResolveReport} import org.apache.ivy.core.resolve.ResolveOptions import org.apache.ivy.core.retrieve.RetrieveOptions import org.apache.ivy.core.settings.IvySettings @@ -1226,7 +1226,7 @@ private[spark] object SparkSubmitUtils extends Logging { s"be whitespace. The artifactId provided is: ${splits(1)}") require(splits(2) != null && splits(2).trim.nonEmpty, s"The version cannot be null or " + s"be whitespace. The version provided is: ${splits(2)}") - new MavenCoordinate(splits(0), splits(1), splits(2)) + MavenCoordinate(splits(0), splits(1), splits(2)) } } @@ -1241,21 +1241,27 @@ private[spark] object SparkSubmitUtils extends Logging { } /** - * Extracts maven coordinates from a comma-delimited string + * Create a ChainResolver used by Ivy to search for and resolve dependencies. + * * @param defaultIvyUserDir The default user path for Ivy + * @param useLocalM2AsCache Whether to use the local maven repo as a cache * @return A ChainResolver used by Ivy to search for and resolve dependencies. */ - def createRepoResolvers(defaultIvyUserDir: File): ChainResolver = { + def createRepoResolvers( + defaultIvyUserDir: File, + useLocalM2AsCache: Boolean = true): ChainResolver = { // We need a chain resolver if we want to check multiple repositories val cr = new ChainResolver cr.setName("spark-list") - val localM2 = new IBiblioResolver - localM2.setM2compatible(true) - localM2.setRoot(m2Path.toURI.toString) - localM2.setUsepoms(true) - localM2.setName("local-m2-cache") - cr.add(localM2) + if (useLocalM2AsCache) { + val localM2 = new IBiblioResolver + localM2.setM2compatible(true) + localM2.setRoot(m2Path.toURI.toString) + localM2.setUsepoms(true) + localM2.setName("local-m2-cache") + cr.add(localM2) + } val localIvy = new FileSystemResolver val localIvyRoot = new File(defaultIvyUserDir, "local") @@ -1351,18 +1357,23 @@ private[spark] object SparkSubmitUtils extends Logging { /** * Build Ivy Settings using options with default resolvers + * * @param remoteRepos Comma-delimited string of remote repositories other than maven central * @param ivyPath The path to the local ivy repository + * @param useLocalM2AsCache Whether or not use `local-m2 repo` as cache * @return An IvySettings object */ - def buildIvySettings(remoteRepos: Option[String], ivyPath: Option[String]): IvySettings = { + def buildIvySettings( + remoteRepos: Option[String], + ivyPath: Option[String], + useLocalM2AsCache: Boolean = true): IvySettings = { val ivySettings: IvySettings = new IvySettings processIvyPathArg(ivySettings, ivyPath) // create a pattern matcher ivySettings.addMatcher(new GlobPatternMatcher) // create the dependency resolvers - val repoResolver = createRepoResolvers(ivySettings.getDefaultIvyUserDir) + val repoResolver = createRepoResolvers(ivySettings.getDefaultIvyUserDir, useLocalM2AsCache) ivySettings.addResolver(repoResolver) ivySettings.setDefaultResolver(repoResolver.getName) processRemoteRepoArg(ivySettings, remoteRepos) @@ -1459,7 +1470,7 @@ private[spark] object SparkSubmitUtils extends Logging { */ private def clearIvyResolutionFiles( mdId: ModuleRevisionId, - ivySettings: IvySettings, + defaultCacheFile: File, ivyConfName: String): Unit = { val currentResolutionFiles = Seq( s"${mdId.getOrganisation}-${mdId.getName}-$ivyConfName.xml", @@ -1467,14 +1478,40 @@ private[spark] object SparkSubmitUtils extends Logging { s"resolved-${mdId.getOrganisation}-${mdId.getName}-${mdId.getRevision}.properties" ) currentResolutionFiles.foreach { filename => - new File(ivySettings.getDefaultCache, filename).delete() + new File(defaultCacheFile, filename).delete() + } + } + + /** + * Clear invalid cache files in ivy. The cache file is usually at + * ~/.ivy2/cache/${groupId}/${artifactId}/ivy-${version}.xml, + * ~/.ivy2/cache/${groupId}/${artifactId}/ivy-${version}.xml.original, and + * ~/.ivy2/cache/${groupId}/${artifactId}/ivydata-${version}.properties. + * Because when using `local-m2` repo as a cache, some invalid files were created. + * If not deleted here, an error prompt similar to `unknown resolver local-m2-cache` + * will be generated, making some confusion for users. + */ + private def clearInvalidIvyCacheFiles( + mdId: ModuleRevisionId, + defaultCacheFile: File): Unit = { + val cacheFiles = Seq( + s"${mdId.getOrganisation}${File.separator}${mdId.getName}${File.separator}" + + s"ivy-${mdId.getRevision}.xml", + s"${mdId.getOrganisation}${File.separator}${mdId.getName}${File.separator}" + + s"ivy-${mdId.getRevision}.xml.original", + s"${mdId.getOrganisation}${File.separator}${mdId.getName}${File.separator}" + + s"ivydata-${mdId.getRevision}.properties") + cacheFiles.foreach { filename => + new File(defaultCacheFile, filename).delete() } } /** * Resolves any dependencies that were supplied through maven coordinates + * * @param coordinates Comma-delimited string of maven coordinates * @param ivySettings An IvySettings containing resolvers to use + * @param noCacheIvySettings An no-cache(local-m2-cache) IvySettings containing resolvers to use * @param transitive Whether resolving transitive dependencies, default is true * @param exclusions Exclusions to apply when resolving transitive dependencies * @return Seq of path to the jars of the given maven artifacts including their @@ -1483,6 +1520,7 @@ private[spark] object SparkSubmitUtils extends Logging { def resolveMavenCoordinates( coordinates: String, ivySettings: IvySettings, + noCacheIvySettings: Option[IvySettings] = None, transitive: Boolean, exclusions: Seq[String] = Nil, isTest: Boolean = false): Seq[String] = { @@ -1511,6 +1549,8 @@ private[spark] object SparkSubmitUtils extends Logging { // scalastyle:on println val ivy = Ivy.newInstance(ivySettings) + ivy.pushContext() + // Set resolve options to download transitive dependencies as well val resolveOptions = new ResolveOptions resolveOptions.setTransitive(transitive) @@ -1523,6 +1563,11 @@ private[spark] object SparkSubmitUtils extends Logging { } else { resolveOptions.setDownload(true) } + // retrieve all resolved dependencies + retrieveOptions.setDestArtifactPattern( + packagesDirectory.getAbsolutePath + File.separator + + "[organization]_[artifact]-[revision](-[classifier]).[ext]") + retrieveOptions.setConfs(Array(ivyConfName)) // Add exclusion rules for Spark and Scala Library addExclusionRules(ivySettings, ivyConfName, md) @@ -1534,17 +1579,44 @@ private[spark] object SparkSubmitUtils extends Logging { // resolve dependencies val rr: ResolveReport = ivy.resolve(md, resolveOptions) if (rr.hasError) { - throw new RuntimeException(rr.getAllProblemMessages.toString) + // SPARK-46302: When there are some corrupted jars in the local maven repo, + // we try to continue without the cache + val failedReports = rr.getArtifactsReports(DownloadStatus.FAILED, true) + if (failedReports.nonEmpty && noCacheIvySettings.isDefined) { + val failedArtifacts = failedReports.map(r => r.getArtifact) + logInfo(s"Download failed: ${failedArtifacts.mkString("[", ", ", "]")}, " + + s"attempt to retry while skipping local-m2-cache.") + failedArtifacts.foreach(artifact => { + clearInvalidIvyCacheFiles(artifact.getModuleRevisionId, ivySettings.getDefaultCache) + }) + ivy.popContext() + + val noCacheIvy = Ivy.newInstance(noCacheIvySettings.get) + noCacheIvy.pushContext() + + val noCacheRr = noCacheIvy.resolve(md, resolveOptions) + if (noCacheRr.hasError) { + throw new RuntimeException(noCacheRr.getAllProblemMessages.toString) + } + noCacheIvy.retrieve(noCacheRr.getModuleDescriptor.getModuleRevisionId, retrieveOptions) + val dependencyPaths = resolveDependencyPaths( + noCacheRr.getArtifacts.toArray, packagesDirectory) + noCacheIvy.popContext() + + dependencyPaths + } else { + throw new RuntimeException(rr.getAllProblemMessages.toString) + } + } else { + ivy.retrieve(rr.getModuleDescriptor.getModuleRevisionId, retrieveOptions) + val dependencyPaths = resolveDependencyPaths(rr.getArtifacts.toArray, packagesDirectory) + ivy.popContext() + + dependencyPaths } - // retrieve all resolved dependencies - retrieveOptions.setDestArtifactPattern(packagesDirectory.getAbsolutePath + File.separator + - "[organization]_[artifact]-[revision](-[classifier]).[ext]") - ivy.retrieve(rr.getModuleDescriptor.getModuleRevisionId, - retrieveOptions.setConfs(Array(ivyConfName))) - resolveDependencyPaths(rr.getArtifacts.toArray, packagesDirectory) } finally { System.setOut(sysOut) - clearIvyResolutionFiles(md.getModuleRevisionId, ivySettings, ivyConfName) + clearIvyResolutionFiles(md.getModuleRevisionId, ivySettings.getDefaultCache, ivyConfName) } } } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala index a28a0464e6ee..18090b53e3c1 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala @@ -139,6 +139,10 @@ private[hive] object IsolatedClientLoader extends Logging { SparkSubmitUtils.buildIvySettings( Some(remoteRepos), ivyPath), + Some(SparkSubmitUtils.buildIvySettings( + Some(remoteRepos), + ivyPath, + useLocalM2AsCache = false)), transitive = true, exclusions = version.exclusions) }