Skip to content
Next Next commit
[SPARK-19458][SQL]load hive jars from local repo which has downloaded
  • Loading branch information
windpiger committed Feb 4, 2017
commit 5c8727dfdd8de8b17cdeab4aba8b316f8063863e
8 changes: 6 additions & 2 deletions core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
Original file line number Diff line number Diff line change
Expand Up @@ -443,7 +443,6 @@ object SparkSubmit extends CommandLineUtils {
OptionAssigner(args.deployMode, ALL_CLUSTER_MGRS, ALL_DEPLOY_MODES,
sysProp = "spark.submit.deployMode"),
OptionAssigner(args.name, ALL_CLUSTER_MGRS, ALL_DEPLOY_MODES, sysProp = "spark.app.name"),
OptionAssigner(args.ivyRepoPath, ALL_CLUSTER_MGRS, CLIENT, sysProp = "spark.jars.ivy"),
OptionAssigner(args.driverMemory, ALL_CLUSTER_MGRS, CLIENT,
sysProp = "spark.driver.memory"),
OptionAssigner(args.driverExtraClassPath, ALL_CLUSTER_MGRS, ALL_DEPLOY_MODES,
Expand Down Expand Up @@ -480,7 +479,12 @@ object SparkSubmit extends CommandLineUtils {
sysProp = "spark.driver.cores"),
OptionAssigner(args.supervise.toString, STANDALONE | MESOS, CLUSTER,
sysProp = "spark.driver.supervise"),
OptionAssigner(args.ivyRepoPath, STANDALONE, CLUSTER, sysProp = "spark.jars.ivy")
OptionAssigner(args.ivyRepoPath, ALL_CLUSTER_MGRS, ALL_DEPLOY_MODES,
sysProp = "spark.jars.ivy"),
OptionAssigner(args.repositories, ALL_CLUSTER_MGRS, ALL_DEPLOY_MODES,
sysProp = "spark.jars.repositories"),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is a new option?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, it is used to store user's repos , then we can use it in download hive jars.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to document it in http://spark.apache.org/docs/latest/configuration.html, like what we did for spark.jars.ivy

OptionAssigner(args.sparkProperties.get("spark.jars.ivySettings").orNull,
ALL_CLUSTER_MGRS, ALL_DEPLOY_MODES, sysProp = "spark.jars.ivySettings")
)

// In client mode, launch the application main class directly
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -320,7 +320,6 @@ private[spark] object HiveUtils extends Logging {
barrierPrefixes = hiveMetastoreBarrierPrefixes,
sharedPrefixes = hiveMetastoreSharedPrefixes)
} else if (hiveMetastoreJars == "maven") {
// TODO: Support for loading the jars from an already downloaded location.
logInfo(
s"Initializing HiveMetastoreConnection version $hiveMetastoreVersion using maven.")
IsolatedClientLoader.forVersion(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ private[hive] object IsolatedClientLoader extends Logging {
} else {
val (downloadedFiles, actualHadoopVersion) =
try {
(downloadVersion(resolvedVersion, hadoopVersion, ivyPath), hadoopVersion)
(downloadVersion(resolvedVersion, hadoopVersion, sparkConf, ivyPath), hadoopVersion)
} catch {
case e: RuntimeException if e.getMessage.contains("hadoop") =>
// If the error message contains hadoop, it is probably because the hadoop
Expand All @@ -73,7 +73,7 @@ private[hive] object IsolatedClientLoader extends Logging {
"It is recommended to set jars used by Hive metastore client through " +
"spark.sql.hive.metastore.jars in the production environment.")
sharesHadoopClasses = false
(downloadVersion(resolvedVersion, "2.4.0", ivyPath), "2.4.0")
(downloadVersion(resolvedVersion, "2.4.0", sparkConf, ivyPath), "2.4.0")
}
resolvedVersions.put((resolvedVersion, actualHadoopVersion), downloadedFiles)
resolvedVersions((resolvedVersion, actualHadoopVersion))
Expand Down Expand Up @@ -102,28 +102,36 @@ private[hive] object IsolatedClientLoader extends Logging {
private def downloadVersion(
version: HiveVersion,
hadoopVersion: String,
sparkConf: SparkConf,
ivyPath: Option[String]): Seq[URL] = {
val hiveArtifacts = version.extraDeps ++
Seq("hive-metastore", "hive-exec", "hive-common", "hive-serde")
.map(a => s"org.apache.hive:$a:${version.fullVersion}") ++
Seq("com.google.guava:guava:14.0.1",
s"org.apache.hadoop:hadoop-client:$hadoopVersion")

// if repositories contain a local repo, it will not download jars from remote repo
val repos: Option[String] = Option(sparkConf.get("spark.jars.repositories")).map {
repo =>
Seq(repo, "http://www.datanucleus.org/downloads/maven2").mkString(",")
}.orElse(Some("http://www.datanucleus.org/downloads/maven2"))

val ivyRepoPath = Option(sparkConf.get("spark.jars.ivy"))
val ivySettings = Option(sparkConf.get("spark.jars.ivySettings")).map { ivySettingsFile =>
SparkSubmitUtils.loadIvySettings(ivySettingsFile, repos, ivyRepoPath)
}.getOrElse {
SparkSubmitUtils.buildIvySettings(repos, ivyRepoPath)
}

val classpath = quietly {
SparkSubmitUtils.resolveMavenCoordinates(
hiveArtifacts.mkString(","),
SparkSubmitUtils.buildIvySettings(
Some("http://www.datanucleus.org/downloads/maven2"),
ivyPath),
ivySettings,
exclusions = version.exclusions)
}
val allFiles = classpath.split(",").map(new File(_)).toSet

// TODO: Remove copy logic.
val tempDir = Utils.createTempDir(namePrefix = s"hive-${version}")
allFiles.foreach(f => FileUtils.copyFileToDirectory(f, tempDir))
logInfo(s"Downloaded metastore jars to ${tempDir.getCanonicalPath}")
tempDir.listFiles().map(_.toURI.toURL)
logInfo(s"Downloaded metastore jars location: $classpath")
classpath.split(",").map(new File(_).toURI.toURL)
}

// A map from a given pair of HiveVersion and Hadoop version to jar files.
Expand Down