Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions repl/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,16 @@
</plugins>
</build>
<profiles>
<profile>
<id>hive</id>
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-hive_${scala.binary.version}</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
</profile>
<profile>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can't make this change as I think it means that we will always build hive with the spark assembly, correct?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, dependency is added in hive profile only. So if assembly is built with -Phive option then hive dependency is added to repl, so that hive is available in class path for repl.
If assembly is built without -Phive option then dependency is not added and the testcase is ignored. Testcase also checks runtime if hiveContext class is available, if not testcase is ignored.
I have manually tested both the cases of building with -Phive and without hive. It will not impact assembly creation.

<id>scala-2.11</id>
<activation>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import java.net.URLClassLoader

import scala.collection.mutable.ArrayBuffer

import com.google.common.io.ByteStreams
import org.scalatest.FunSuite
import org.apache.spark.SparkContext
import org.apache.commons.lang3.StringEscapeUtils
Expand Down Expand Up @@ -77,6 +78,23 @@ class ReplSuite extends FunSuite {
"Interpreter output contained '" + message + "':\n" + output)
}

def getHiveFile(path: String): File = {
val inRepoTests = if (System.getProperty("user.dir").endsWith("sql" + File.separator + "hive")) {
new File("src" + File.separator + "test" + File.separator + "resources" + File.separator)
} else {
new File(".." + File.separator + "sql" + File.separator + "hive" + File.separator + "src"
+ File.separator + "test" + File.separator + "resources")
}

val stripped = path.replaceAll("""\.\.\/""", "").replace('/', File.separatorChar)
/** The location of the hive source code. */
lazy val hiveDevHome = Option(System.getenv("HIVE_DEV_HOME")).map(new File(_))
hiveDevHome
.map(new File(_, stripped))
.filter(_.exists)
.getOrElse(new File(inRepoTests, stripped))
}

test("propagation of local properties") {
// A mock ILoop that doesn't install the SIGINT handler.
class ILoop(out: PrintWriter) extends SparkILoop(None, out, None) {
Expand Down Expand Up @@ -327,4 +345,41 @@ class ReplSuite extends FunSuite {
assertDoesNotContain("Exception", output)
assertContains("ret: Array[(Int, Iterable[Foo])] = Array((1,", output)
}

test("SPARK-5818 ADD JAR in hql from spark-shell") {
val testJar = getHiveFile("data/files/TestSerDe.jar").getCanonicalPath
val output = runInterpreter("local",
s"""
|import org.apache.spark.SparkContext
|import org.apache.spark.sql.SQLContext
|var ignoredTest = false
|try {
| val hiveContextClass =
| Thread.currentThread.getContextClassLoader.loadClass("org.apache.spark.sql.hive.HiveContext")
| val hiveContext = hiveContextClass.getConstructor(classOf[SparkContext])
| .newInstance(sc).asInstanceOf[SQLContext]
| hiveContext.sql("add jar $testJar")
|} catch {
| case cnf: java.lang.ClassNotFoundException =>
| println("Ignoring test: as no HiveContext found")
| ignoredTest = true
|}
|if (ignoredTest == false) {
| val testData = Array((1,1))
| // test if jar is accessible to all executors
| sc.parallelize(1 to 4).foreach{ x =>
| Thread.currentThread.getContextClassLoader.loadClass("org.apache.hadoop.hive.serde2.TestSerDe")
| }
| // test if jar is accessible to driver
| Thread.currentThread.getContextClassLoader.loadClass("org.apache.hadoop.hive.serde2.TestSerDe")
|}
|
""".stripMargin)
if (output.contains("Ignoring test:")) {
markup("Ignored test as no HiveContext found")
} else {
assertDoesNotContain("error:", output)
assertDoesNotContain("Exception", output)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,24 @@ class ReplSuite extends FunSuite {
assert(!isContain,
"Interpreter output contained '" + message + "':\n" + output)
}

def getHiveFile(path: String): File = {
val inRepoTests = if (System.getProperty("user.dir").endsWith("sql" + File.separator + "hive")) {
new File("src" + File.separator + "test" + File.separator + "resources" + File.separator)
} else {
new File(".." + File.separator + "sql" + File.separator + "hive" + File.separator + "src"
+ File.separator + "test" + File.separator + "resources")
}

val stripped = path.replaceAll("""\.\.\/""", "").replace('/', File.separatorChar)
/** The location of the hive source code. */
lazy val hiveDevHome = Option(System.getenv("HIVE_DEV_HOME")).map(new File(_))
hiveDevHome
.map(new File(_, stripped))
.filter(_.exists)
.getOrElse(new File(inRepoTests, stripped))
}

test("propagation of local properties") {
// A mock ILoop that doesn't install the SIGINT handler.
class ILoop(out: PrintWriter) extends SparkILoop(None, out) {
Expand Down Expand Up @@ -332,4 +349,41 @@ class ReplSuite extends FunSuite {
assertDoesNotContain("Exception", output)
assertContains("ret: Array[(Int, Iterable[Foo])] = Array((1,", output)
}

test("SPARK-5818 ADD JAR in hql from spark-shell") {
val testJar = getHiveFile("data/files/TestSerDe.jar").getCanonicalPath
val output = runInterpreter("local",
s"""
|import org.apache.spark.SparkContext
|import org.apache.spark.sql.SQLContext
|var ignoredTest = false
|try {
| val hiveContextClass =
| Thread.currentThread.getContextClassLoader.loadClass("org.apache.spark.sql.hive.HiveContext")
| val hiveContext = hiveContextClass.getConstructor(classOf[SparkContext])
| .newInstance(sc).asInstanceOf[SQLContext]
| hiveContext.sql("add jar $testJar")
|} catch {
| case cnf: java.lang.ClassNotFoundException =>
| println("Ignoring test: as no HiveContext found")
| ignoredTest = true
|}
|if (ignoredTest == false) {
| val testData = Array((1,1))
| // test if jar is accessible to all executors
| sc.parallelize(1 to 4).foreach{ x =>
| Thread.currentThread.getContextClassLoader.loadClass("org.apache.hadoop.hive.serde2.TestSerDe")
| }
| // test if jar is accessible to driver
| Thread.currentThread.getContextClassLoader.loadClass("org.apache.hadoop.hive.serde2.TestSerDe")
|}
|
""".stripMargin)
if (output.contains("Ignoring test:")) {
markup("Ignored test as no HiveContext found")
} else {
assertDoesNotContain("error:", output)
assertDoesNotContain("Exception", output)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,11 @@

package org.apache.spark.sql.hive.execution

import java.io.File
import java.net.URI
import java.net.URLClassLoader

import scala.tools.nsc.util.ScalaClassLoader
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.analysis.EliminateSubQueries
import org.apache.spark.sql.catalyst.util._
Expand All @@ -27,6 +32,9 @@ import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.execution.RunnableCommand
import org.apache.spark.sql.hive.HiveContext
import org.apache.spark.sql.types.StructType
import org.apache.spark.SparkFiles
import org.apache.spark.SparkContext
import org.apache.spark.util.Utils

/**
* Analyzes the given table in the current database to generate statistics, which will be
Expand Down Expand Up @@ -76,13 +84,48 @@ private[hive]
case class AddJar(path: String) extends RunnableCommand {

override def run(sqlContext: SQLContext): Seq[Row] = {
val hiveContext = sqlContext.asInstanceOf[HiveContext]
hiveContext.runSqlHive(s"ADD JAR $path")
hiveContext.sparkContext.addJar(path)
val currClassLoader = Utils.getContextOrSparkClassLoader
currClassLoader match {
// only URLCLassLoader is supported by Hive
case u: URLClassLoader =>
val hiveContext = sqlContext.asInstanceOf[HiveContext]
hiveContext.runSqlHive(s"ADD JAR $path")
hiveContext.sparkContext.addJar(path)
// try adding file URL to to parent Class Loader, useful for spark shell
case _ =>
currClassLoader.getParent match {
case c : ScalaClassLoader.URLClassLoader =>
sqlContext.sparkContext.addJar(path)
val filenamePath = downloadFile(sqlContext.sparkContext)
val uri = new URI("file://" + filenamePath)
c.addURL(uri.toURL)
case _ =>
val classloader_class = currClassLoader.getClass.getName
throw new AnalysisException(
s"Current class loader $classloader_class does not support Add JAR.")
}
}
Seq.empty[Row]
}
}

private def downloadFile(sparkContext: SparkContext): String = {
val filename = path.split("/").last
val uri = new URI(path)
val filenamePath: String = uri.getScheme match {
case null | "local" | "file" => uri.getPath
case _ =>
Utils.fetchFile(path,
new File(SparkFiles.getRootDirectory()),
sparkContext.conf,
sparkContext.env.securityManager,
sparkContext.hadoopConfiguration,
System.currentTimeMillis, useCache = false)
SparkFiles.get(filename)
}
filenamePath
}
}

private[hive]
case class AddFile(path: String) extends RunnableCommand {

Expand Down