Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -210,10 +210,34 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) {
}
}

protected[hive] def sessionState = tlSession.get().asInstanceOf[this.SQLSession].sessionState
/**
* SQLConf and HiveConf contracts:
*
* 1. reuse existing started SessionState if any
* 2. when the Hive session is first initialized, params in HiveConf will get picked up by the
* SQLConf. Additionally, any properties set by set() or a SET command inside sql() will be
* set in the SQLConf *as well as* in the HiveConf.
*/
@transient protected[hive] lazy val sessionState: SessionState = {
var state = SessionState.get()
if (state == null) {
state = new SessionState(overrideHiveConf(new HiveConf(classOf[SessionState])))
SessionState.start(state)
}
if (state.out == null) {
state.out = new PrintStream(outputBuffer, true, "UTF-8")
}
if (state.err == null) {
state.err = new PrintStream(outputBuffer, true, "UTF-8")
}
state
}

protected[hive] def hiveconf = tlSession.get().asInstanceOf[this.SQLSession].hiveconf

// Sub-classes of HiveContext for testing purposes can override Hive configurations here.
protected def overrideHiveConf(conf: HiveConf): HiveConf = conf

override def setConf(key: String, value: String): Unit = {
super.setConf(key, value)
runSqlHive(s"SET $key=$value")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package org.apache.spark.sql.hive.test
import java.io.File
import java.util.{Set => JavaSet}

import org.apache.hadoop.hive.conf.HiveConf
import org.apache.hadoop.hive.ql.exec.FunctionRegistry
import org.apache.hadoop.hive.ql.io.avro.{AvroContainerInputFormat, AvroContainerOutputFormat}
import org.apache.hadoop.hive.ql.metadata.Table
Expand Down Expand Up @@ -72,13 +73,20 @@ class TestHiveContext(sc: SparkContext) extends HiveContext(sc) {
lazy val warehousePath = getTempFilePath("sparkHiveWarehouse").getCanonicalPath
lazy val metastorePath = getTempFilePath("sparkHiveMetastore").getCanonicalPath

/** Sets up the system initially or after a RESET command */
protected def configure(): Unit = {
setConf("javax.jdo.option.ConnectionURL",
/** Overrides metastore and warehouse paths */
override protected def overrideHiveConf(conf: HiveConf): HiveConf = {
conf.set(
"javax.jdo.option.ConnectionURL",
s"jdbc:derby:;databaseName=$metastorePath;create=true")
setConf("hive.metastore.warehouse.dir", warehousePath)

conf.set(
"hive.metastore.warehouse.dir",
warehousePath)

Utils.registerShutdownDeleteDir(new File(warehousePath))
Utils.registerShutdownDeleteDir(new File(metastorePath))

conf
}

val testTempDir = File.createTempFile("testTempFiles", "spark.hive.tmp")
Expand All @@ -89,8 +97,6 @@ class TestHiveContext(sc: SparkContext) extends HiveContext(sc) {
// For some hive test case which contain ${system:test.tmp.dir}
System.setProperty("test.tmp.dir", testTempDir.getCanonicalPath)

configure() // Must be called before initializing the catalog below.

/** The location of the compiled hive distribution */
lazy val hiveHome = envVarToFile("HIVE_HOME")
/** The location of the hive source code. */
Expand Down Expand Up @@ -389,9 +395,6 @@ class TestHiveContext(sc: SparkContext) extends HiveContext(sc) {
*/
protected val originalUdfs: JavaSet[String] = FunctionRegistry.getFunctionNames

// Database default may not exist in 0.13.1, create it if not exist
HiveShim.createDefaultDBIfNeeded(this)

/**
* Resets the test instance by deleting any tables that have been created.
* TODO: also clear out UDFs, views, etc.
Expand Down Expand Up @@ -442,7 +445,6 @@ class TestHiveContext(sc: SparkContext) extends HiveContext(sc) {
runSqlHive("set datanucleus.cache.collections.lazy=true")
// Lots of tests fail if we do not change the partition whitelist from the default.
runSqlHive("set hive.metastore.partition.name.whitelist.pattern=.*")
configure()

runSqlHive("USE default")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -189,8 +189,6 @@ private[hive] object HiveShim {

def getStatsSetupConstRawDataSize = StatsSetupConst.RAW_DATA_SIZE

def createDefaultDBIfNeeded(context: HiveContext) = { }

def getCommandProcessor(cmd: Array[String], conf: HiveConf) = {
CommandProcessorFactory.get(cmd(0), conf)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ import org.apache.spark.sql.types.{Decimal, DecimalType}
/**
* This class provides the UDF creation and also the UDF instance serialization and
* de-serialization cross process boundary.
*
*
* Detail discussion can be found at https://github.com/apache/spark/pull/3640
*
* @param functionClassName UDF class name
Expand Down Expand Up @@ -301,11 +301,6 @@ private[hive] object HiveShim {

def getStatsSetupConstRawDataSize = StatsSetupConst.RAW_DATA_SIZE

def createDefaultDBIfNeeded(context: HiveContext) = {
context.runSqlHive("CREATE DATABASE default")
context.runSqlHive("USE default")
}

def getCommandProcessor(cmd: Array[String], conf: HiveConf) = {
CommandProcessorFactory.get(cmd, conf)
}
Expand Down Expand Up @@ -400,7 +395,7 @@ private[hive] object HiveShim {
Decimal(hdoi.getPrimitiveJavaObject(data).bigDecimalValue(), hdoi.precision(), hdoi.scale())
}
}

/*
* Bug introduced in hive-0.13. AvroGenericRecordWritable has a member recordReaderID that
* is needed to initialize before serialization.
Expand Down