diff --git a/repl/pom.xml b/repl/pom.xml index edfa1c7f2c29..f95ab017977a 100644 --- a/repl/pom.xml +++ b/repl/pom.xml @@ -149,6 +149,16 @@ + + hive + + + org.apache.spark + spark-hive_${scala.binary.version} + ${project.version} + + + scala-2.11 diff --git a/repl/scala-2.10/src/test/scala/org/apache/spark/repl/ReplSuite.scala b/repl/scala-2.10/src/test/scala/org/apache/spark/repl/ReplSuite.scala index 934daaeaafca..65f68fd60888 100644 --- a/repl/scala-2.10/src/test/scala/org/apache/spark/repl/ReplSuite.scala +++ b/repl/scala-2.10/src/test/scala/org/apache/spark/repl/ReplSuite.scala @@ -22,6 +22,7 @@ import java.net.URLClassLoader import scala.collection.mutable.ArrayBuffer +import com.google.common.io.ByteStreams import org.scalatest.FunSuite import org.apache.spark.SparkContext import org.apache.commons.lang3.StringEscapeUtils @@ -77,6 +78,23 @@ class ReplSuite extends FunSuite { "Interpreter output contained '" + message + "':\n" + output) } + def getHiveFile(path: String): File = { + val inRepoTests = if (System.getProperty("user.dir").endsWith("sql" + File.separator + "hive")) { + new File("src" + File.separator + "test" + File.separator + "resources" + File.separator) + } else { + new File(".." + File.separator + "sql" + File.separator + "hive" + File.separator + "src" + + File.separator + "test" + File.separator + "resources") + } + + val stripped = path.replaceAll("""\.\.\/""", "").replace('/', File.separatorChar) + /** The location of the hive source code. */ + lazy val hiveDevHome = Option(System.getenv("HIVE_DEV_HOME")).map(new File(_)) + hiveDevHome + .map(new File(_, stripped)) + .filter(_.exists) + .getOrElse(new File(inRepoTests, stripped)) + } + test("propagation of local properties") { // A mock ILoop that doesn't install the SIGINT handler. class ILoop(out: PrintWriter) extends SparkILoop(None, out, None) { @@ -327,4 +345,41 @@ class ReplSuite extends FunSuite { assertDoesNotContain("Exception", output) assertContains("ret: Array[(Int, Iterable[Foo])] = Array((1,", output) } + + test("SPARK-5818 ADD JAR in hql from spark-shell") { + val testJar = getHiveFile("data/files/TestSerDe.jar").getCanonicalPath + val output = runInterpreter("local", + s""" + |import org.apache.spark.SparkContext + |import org.apache.spark.sql.SQLContext + |var ignoredTest = false + |try { + | val hiveContextClass = + | Thread.currentThread.getContextClassLoader.loadClass("org.apache.spark.sql.hive.HiveContext") + | val hiveContext = hiveContextClass.getConstructor(classOf[SparkContext]) + | .newInstance(sc).asInstanceOf[SQLContext] + | hiveContext.sql("add jar $testJar") + |} catch { + | case cnf: java.lang.ClassNotFoundException => + | println("Ignoring test: as no HiveContext found") + | ignoredTest = true + |} + |if (ignoredTest == false) { + | val testData = Array((1,1)) + | // test if jar is accessible to all executors + | sc.parallelize(1 to 4).foreach{ x => + | Thread.currentThread.getContextClassLoader.loadClass("org.apache.hadoop.hive.serde2.TestSerDe") + | } + | // test if jar is accessible to driver + | Thread.currentThread.getContextClassLoader.loadClass("org.apache.hadoop.hive.serde2.TestSerDe") + |} + | + """.stripMargin) + if (output.contains("Ignoring test:")) { + markup("Ignored test as no HiveContext found") + } else { + assertDoesNotContain("error:", output) + assertDoesNotContain("Exception", output) + } + } } diff --git a/repl/scala-2.11/src/test/scala/org/apache/spark/repl/ReplSuite.scala b/repl/scala-2.11/src/test/scala/org/apache/spark/repl/ReplSuite.scala index 14f5e9ed4f25..7d597d368210 100644 --- a/repl/scala-2.11/src/test/scala/org/apache/spark/repl/ReplSuite.scala +++ b/repl/scala-2.11/src/test/scala/org/apache/spark/repl/ReplSuite.scala @@ -81,7 +81,24 @@ class ReplSuite extends FunSuite { assert(!isContain, "Interpreter output contained '" + message + "':\n" + output) } + + def getHiveFile(path: String): File = { + val inRepoTests = if (System.getProperty("user.dir").endsWith("sql" + File.separator + "hive")) { + new File("src" + File.separator + "test" + File.separator + "resources" + File.separator) + } else { + new File(".." + File.separator + "sql" + File.separator + "hive" + File.separator + "src" + + File.separator + "test" + File.separator + "resources") + } + val stripped = path.replaceAll("""\.\.\/""", "").replace('/', File.separatorChar) + /** The location of the hive source code. */ + lazy val hiveDevHome = Option(System.getenv("HIVE_DEV_HOME")).map(new File(_)) + hiveDevHome + .map(new File(_, stripped)) + .filter(_.exists) + .getOrElse(new File(inRepoTests, stripped)) + } + test("propagation of local properties") { // A mock ILoop that doesn't install the SIGINT handler. class ILoop(out: PrintWriter) extends SparkILoop(None, out) { @@ -332,4 +349,41 @@ class ReplSuite extends FunSuite { assertDoesNotContain("Exception", output) assertContains("ret: Array[(Int, Iterable[Foo])] = Array((1,", output) } + + test("SPARK-5818 ADD JAR in hql from spark-shell") { + val testJar = getHiveFile("data/files/TestSerDe.jar").getCanonicalPath + val output = runInterpreter("local", + s""" + |import org.apache.spark.SparkContext + |import org.apache.spark.sql.SQLContext + |var ignoredTest = false + |try { + | val hiveContextClass = + | Thread.currentThread.getContextClassLoader.loadClass("org.apache.spark.sql.hive.HiveContext") + | val hiveContext = hiveContextClass.getConstructor(classOf[SparkContext]) + | .newInstance(sc).asInstanceOf[SQLContext] + | hiveContext.sql("add jar $testJar") + |} catch { + | case cnf: java.lang.ClassNotFoundException => + | println("Ignoring test: as no HiveContext found") + | ignoredTest = true + |} + |if (ignoredTest == false) { + | val testData = Array((1,1)) + | // test if jar is accessible to all executors + | sc.parallelize(1 to 4).foreach{ x => + | Thread.currentThread.getContextClassLoader.loadClass("org.apache.hadoop.hive.serde2.TestSerDe") + | } + | // test if jar is accessible to driver + | Thread.currentThread.getContextClassLoader.loadClass("org.apache.hadoop.hive.serde2.TestSerDe") + |} + | + """.stripMargin) + if (output.contains("Ignoring test:")) { + markup("Ignored test as no HiveContext found") + } else { + assertDoesNotContain("error:", output) + assertDoesNotContain("Exception", output) + } + } } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala index 4345ffbf30f7..7a49f999f9ae 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala @@ -17,6 +17,11 @@ package org.apache.spark.sql.hive.execution +import java.io.File +import java.net.URI +import java.net.URLClassLoader + +import scala.tools.nsc.util.ScalaClassLoader import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.analysis.EliminateSubQueries import org.apache.spark.sql.catalyst.util._ @@ -27,6 +32,9 @@ import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.execution.RunnableCommand import org.apache.spark.sql.hive.HiveContext import org.apache.spark.sql.types.StructType +import org.apache.spark.SparkFiles +import org.apache.spark.SparkContext +import org.apache.spark.util.Utils /** * Analyzes the given table in the current database to generate statistics, which will be @@ -76,13 +84,48 @@ private[hive] case class AddJar(path: String) extends RunnableCommand { override def run(sqlContext: SQLContext): Seq[Row] = { - val hiveContext = sqlContext.asInstanceOf[HiveContext] - hiveContext.runSqlHive(s"ADD JAR $path") - hiveContext.sparkContext.addJar(path) + val currClassLoader = Utils.getContextOrSparkClassLoader + currClassLoader match { + // only URLCLassLoader is supported by Hive + case u: URLClassLoader => + val hiveContext = sqlContext.asInstanceOf[HiveContext] + hiveContext.runSqlHive(s"ADD JAR $path") + hiveContext.sparkContext.addJar(path) + // try adding file URL to to parent Class Loader, useful for spark shell + case _ => + currClassLoader.getParent match { + case c : ScalaClassLoader.URLClassLoader => + sqlContext.sparkContext.addJar(path) + val filenamePath = downloadFile(sqlContext.sparkContext) + val uri = new URI("file://" + filenamePath) + c.addURL(uri.toURL) + case _ => + val classloader_class = currClassLoader.getClass.getName + throw new AnalysisException( + s"Current class loader $classloader_class does not support Add JAR.") + } + } Seq.empty[Row] } -} + private def downloadFile(sparkContext: SparkContext): String = { + val filename = path.split("/").last + val uri = new URI(path) + val filenamePath: String = uri.getScheme match { + case null | "local" | "file" => uri.getPath + case _ => + Utils.fetchFile(path, + new File(SparkFiles.getRootDirectory()), + sparkContext.conf, + sparkContext.env.securityManager, + sparkContext.hadoopConfiguration, + System.currentTimeMillis, useCache = false) + SparkFiles.get(filename) + } + filenamePath + } +} + private[hive] case class AddFile(path: String) extends RunnableCommand {