Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -573,7 +573,8 @@ public void renewDelegationToken(SessionHandle sessionHandle, HiveAuthFactory au
public String getQueryId(TOperationHandle opHandle) throws HiveSQLException {
Operation operation = sessionManager.getOperationManager().getOperation(
new OperationHandle(opHandle));
final String queryId = operation.getParentSession().getHiveConf().getVar(ConfVars.HIVEQUERYID);
final String queryId = operation.getParentSession().getHiveConf().getVar(
HiveConf.getConfVars("hive.query.id"));
LOG.debug(opHandle + ": getQueryId() " + queryId);
return queryId;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ public HiveSessionImpl(TProtocolVersion protocol, String username, String passwo
LOG.warn("Error setting scheduler queue: " + e, e);
}
// Set an explicit session name to control the download directory name
hiveConf.set(ConfVars.HIVESESSIONID.varname,
hiveConf.set("hive.session.id",
sessionHandle.getHandleIdentifier().toString());
// Use thrift transportable formatter
hiveConf.set(SerDeUtils.LIST_SINK_OUTPUT_FORMATTER, ThriftFormatter.class.getName());
Expand Down Expand Up @@ -406,7 +406,7 @@ public String getPassword() {

@Override
public HiveConf getHiveConf() {
hiveConf.setVar(HiveConf.ConfVars.HIVEFETCHOUTPUTSERDE, FETCH_WORK_SERDE_CLASS);
hiveConf.setVar(HiveConf.getConfVars("hive.fetch.output.serde"), FETCH_WORK_SERDE_CLASS);
return hiveConf;
}

Expand Down Expand Up @@ -686,8 +686,8 @@ public void close() throws HiveSQLException {
}

private void cleanupPipeoutFile() {
String lScratchDir = hiveConf.getVar(ConfVars.LOCALSCRATCHDIR);
String sessionID = hiveConf.getVar(ConfVars.HIVESESSIONID);
String lScratchDir = hiveConf.getVar(HiveConf.getConfVars("hive.exec.local.scratchdir"));
String sessionID = hiveConf.getVar(HiveConf.getConfVars("hive.session.id"));

File[] fileAry = new File(lScratchDir).listFiles(
(dir, name) -> name.startsWith(sessionID) && name.endsWith(".pipeout"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ private[hive] object SparkSQLCLIDriver extends Logging {
// In SparkSQL CLI, we may want to use jars augmented by hiveconf
// hive.aux.jars.path, here we add jars augmented by hiveconf to
// Spark's SessionResourceLoader to obtain these jars.
val auxJars = HiveConf.getVar(conf, HiveConf.ConfVars.HIVEAUXJARS)
val auxJars = HiveConf.getVar(conf, HiveConf.getConfVars("hive.aux.jars.path"))
if (StringUtils.isNotBlank(auxJars)) {
val resourceLoader = SparkSQLEnv.sqlContext.sessionState.resourceLoader
StringUtils.split(auxJars, ",").foreach(resourceLoader.addJar(_))
Expand Down Expand Up @@ -577,7 +577,8 @@ private[hive] class SparkSQLCLIDriver extends CliDriver with Logging {
val ret = processCmd(command)
command = ""
lastRet = ret
val ignoreErrors = HiveConf.getBoolVar(conf, HiveConf.ConfVars.CLIIGNOREERRORS)
val ignoreErrors =
HiveConf.getBoolVar(conf, HiveConf.getConfVars("hive.cli.errors.ignore"))
if (ret != 0 && !ignoreErrors) {
CommandProcessorFactory.clean(conf.asInstanceOf[HiveConf])
return ret
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ import scala.concurrent.duration._
import scala.jdk.CollectionConverters._

import org.apache.hadoop.hive.cli.CliSessionState
import org.apache.hadoop.hive.conf.HiveConf.ConfVars
import org.apache.hadoop.hive.ql.session.SessionState

import org.apache.spark.{ErrorMessageFormat, SparkConf, SparkContext, SparkFunSuite}
Expand Down Expand Up @@ -117,7 +116,7 @@ class CliSuite extends SparkFunSuite {
""
}
val warehouseConf =
maybeWarehouse.map(dir => s"--hiveconf ${ConfVars.METASTOREWAREHOUSE}=$dir").getOrElse("")
maybeWarehouse.map(dir => s"--hiveconf hive.metastore.warehouse.dir=$dir").getOrElse("")
val command = {
val cliScript = "../../bin/spark-sql".split("/").mkString(File.separator)
val jdbcUrl = s"jdbc:derby:;databaseName=$metastore;create=true"
Expand All @@ -127,8 +126,8 @@ class CliSuite extends SparkFunSuite {
| $extraHive
| --conf spark.ui.enabled=false
| --conf ${SQLConf.LEGACY_EMPTY_CURRENT_DB_IN_CLI.key}=true
| --hiveconf ${ConfVars.METASTORECONNECTURLKEY}=$jdbcUrl
| --hiveconf ${ConfVars.SCRATCHDIR}=$scratchDirPath
| --hiveconf javax.jdo.option.ConnectionURL=$jdbcUrl
| --hiveconf hive.exec.scratchdir=$scratchDirPath
| --hiveconf conf1=conftest
| --hiveconf conf2=1
| $warehouseConf
Expand Down Expand Up @@ -251,7 +250,7 @@ class CliSuite extends SparkFunSuite {
try {
runCliWithin(2.minute,
extraArgs =
Seq("--conf", s"spark.hadoop.${ConfVars.METASTOREWAREHOUSE}=$sparkWareHouseDir"),
Seq("--conf", s"spark.hadoop.hive.metastore.warehouse.dir=$sparkWareHouseDir"),
maybeWarehouse = None,
useExternalHiveFile = true,
metastore = metastore)(
Expand All @@ -262,7 +261,7 @@ class CliSuite extends SparkFunSuite {

// override conf from --hiveconf too
runCliWithin(2.minute,
extraArgs = Seq("--conf", s"spark.${ConfVars.METASTOREWAREHOUSE}=$sparkWareHouseDir"),
extraArgs = Seq("--conf", s"spark.hive.metastore.warehouse.dir=$sparkWareHouseDir"),
metastore = metastore)(
"desc database default;" -> sparkWareHouseDir.getAbsolutePath,
"create database cliTestDb;" -> "",
Expand All @@ -281,7 +280,7 @@ class CliSuite extends SparkFunSuite {
runCliWithin(2.minute,
extraArgs = Seq(
"--conf", s"${StaticSQLConf.WAREHOUSE_PATH.key}=${sparkWareHouseDir}1",
"--conf", s"spark.hadoop.${ConfVars.METASTOREWAREHOUSE}=${sparkWareHouseDir}2"),
"--conf", s"spark.hadoop.hive.metastore.warehouse.dir=${sparkWareHouseDir}2"),
metastore = metastore)(
"desc database default;" -> sparkWareHouseDir.getAbsolutePath.concat("1"))
} finally {
Expand Down Expand Up @@ -363,7 +362,7 @@ class CliSuite extends SparkFunSuite {
val hiveContribJar = HiveTestJars.getHiveHcatalogCoreJar().getCanonicalPath
runCliWithin(
3.minute,
Seq("--conf", s"spark.hadoop.${ConfVars.HIVEAUXJARS}=$hiveContribJar"))(
Seq("--conf", s"spark.hadoop.hive.aux.jars.path=$hiveContribJar"))(
"""CREATE TABLE addJarWithHiveAux(key string, val string)
|ROW FORMAT SERDE 'org.apache.hive.hcatalog.data.JsonSerDe';""".stripMargin
-> "",
Expand Down Expand Up @@ -442,7 +441,7 @@ class CliSuite extends SparkFunSuite {
val hiveContribJar = HiveTestJars.getHiveContribJar().getCanonicalPath
runCliWithin(
1.minute,
Seq("--conf", s"spark.hadoop.${ConfVars.HIVEAUXJARS}=$hiveContribJar"))(
Seq("--conf", s"spark.hadoop.hive.aux.jars.path=$hiveContribJar"))(
"CREATE TEMPORARY FUNCTION example_format AS " +
"'org.apache.hadoop.hive.contrib.udf.example.UDFExampleFormat';" -> "",
"SELECT example_format('%o', 93);" -> "135"
Expand All @@ -466,7 +465,7 @@ class CliSuite extends SparkFunSuite {
runCliWithin(
2.minutes,
Seq("--jars", s"$jarFile", "--conf",
s"spark.hadoop.${ConfVars.HIVEAUXJARS}=$hiveContribJar"))(
s"spark.hadoop.hive.aux.jars.path=$hiveContribJar"))(
"CREATE TEMPORARY FUNCTION testjar AS" +
" 'org.apache.spark.sql.hive.execution.UDTFStack';" -> "",
"SELECT testjar(1,'TEST-SPARK-TEST-jar', 28840);" -> "TEST-SPARK-TEST-jar\t28840",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1120,7 +1120,7 @@ class HiveThriftCleanUpScratchDirSuite extends HiveThriftServer2TestBase {

override protected def extraConf: Seq[String] =
s" --hiveconf ${ConfVars.HIVE_START_CLEANUP_SCRATCHDIR}=true " ::
s"--hiveconf ${ConfVars.SCRATCHDIR}=${tempScratchDir.getAbsolutePath}" :: Nil
s"--hiveconf hive.exec.scratchdir=${tempScratchDir.getAbsolutePath}" :: Nil

test("Cleanup the Hive scratchdir when starting the Hive Server") {
assert(!tempScratchDir.exists())
Expand Down Expand Up @@ -1239,12 +1239,12 @@ abstract class HiveThriftServer2TestBase extends SparkFunSuite with BeforeAndAft

s"""$startScript
| --master local
| --hiveconf ${ConfVars.METASTORECONNECTURLKEY}=$metastoreJdbcUri
| --hiveconf ${ConfVars.METASTOREWAREHOUSE}=$warehousePath
| --hiveconf javax.jdo.option.ConnectionURL=$metastoreJdbcUri
| --hiveconf hive.metastore.warehouse.dir=$warehousePath
| --hiveconf ${ConfVars.HIVE_SERVER2_THRIFT_BIND_HOST}=$localhost
| --hiveconf ${ConfVars.HIVE_SERVER2_TRANSPORT_MODE}=$mode
| --hiveconf ${ConfVars.HIVE_SERVER2_LOGGING_OPERATION_LOG_LOCATION}=$operationLogPath
| --hiveconf ${ConfVars.LOCALSCRATCHDIR}=$lScratchDir
| --hiveconf hive.exec.local.scratchdir=$lScratchDir
| --hiveconf $portConf=0
| --driver-class-path $driverClassPath
| --driver-java-options -Dlog4j2.debug
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ trait SharedThriftServer extends SharedSparkSession {
sqlContext.setConf(ConfVars.HIVE_SERVER2_THRIFT_PORT.varname, "0")
sqlContext.setConf(ConfVars.HIVE_SERVER2_THRIFT_HTTP_PORT.varname, "0")
sqlContext.setConf(ConfVars.HIVE_SERVER2_TRANSPORT_MODE.varname, mode.toString)
sqlContext.setConf(ConfVars.SCRATCHDIR.varname, tempScratchDir.getAbsolutePath)
sqlContext.setConf("hive.exec.scratchdir", tempScratchDir.getAbsolutePath)
sqlContext.setConf(ConfVars.HIVE_START_CLEANUP_SCRATCHDIR.varname, "true")

try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,8 +92,8 @@ class UISeleniumSuite

s"""$startScript
| --master local
| --hiveconf ${ConfVars.METASTORECONNECTURLKEY}=$metastoreJdbcUri
| --hiveconf ${ConfVars.METASTOREWAREHOUSE}=$warehousePath
| --hiveconf javax.jdo.option.ConnectionURL=$metastoreJdbcUri
| --hiveconf hive.metastore.warehouse.dir=$warehousePath
| --hiveconf ${ConfVars.HIVE_SERVER2_THRIFT_BIND_HOST}=$localhost
| --hiveconf ${ConfVars.HIVE_SERVER2_TRANSPORT_MODE}=$mode
| --hiveconf $portConf=0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -433,7 +433,7 @@ private[spark] object HiveUtils extends Logging {
}
}
propMap.put(WAREHOUSE_PATH.key, localMetastore.toURI.toString)
propMap.put(HiveConf.ConfVars.METASTORECONNECTURLKEY.varname,
propMap.put("javax.jdo.option.ConnectionURL",
s"jdbc:derby:${withInMemoryMode};databaseName=${localMetastore.getAbsolutePath};create=true")
propMap.put("datanucleus.rdbms.datastoreAdapterClassName",
"org.datanucleus.store.rdbms.adapter.DerbyAdapter")
Expand All @@ -454,10 +454,10 @@ private[spark] object HiveUtils extends Logging {
// Because execution Hive should always connects to an embedded derby metastore.
// We have to remove the value of hive.metastore.uris. So, the execution Hive client connects
// to the actual embedded derby metastore instead of the remote metastore.
// You can search HiveConf.ConfVars.METASTOREURIS in the code of HiveConf (in Hive's repo).
// You can search hive.metastore.uris in the code of HiveConf (in Hive's repo).
// Then, you will find that the local metastore mode is only set to true when
// hive.metastore.uris is not set.
propMap.put(ConfVars.METASTOREURIS.varname, "")
propMap.put("hive.metastore.uris", "")

// The execution client will generate garbage events, therefore the listeners that are generated
// for the execution clients are useless. In order to not output garbage, we don't generate
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.hadoop.hive.common.StatsSetupConst
import org.apache.hadoop.hive.conf.HiveConf
import org.apache.hadoop.hive.conf.HiveConf.ConfVars
import org.apache.hadoop.hive.metastore.{IMetaStoreClient, TableType => HiveTableType}
import org.apache.hadoop.hive.metastore.api.{Database => HiveDatabase, Table => MetaStoreApiTable, _}
import org.apache.hadoop.hive.ql.Driver
Expand Down Expand Up @@ -150,7 +149,7 @@ private[hive] class HiveClientImpl(
// hive.metastore.warehouse.dir is determined in SharedState after the CliSessionState
// instance constructed, we need to follow that change here.
warehouseDir.foreach { dir =>
ret.getConf.setVar(ConfVars.METASTOREWAREHOUSE, dir)
ret.getConf.setVar(HiveConf.getConfVars("hive.metastore.warehouse.dir"), dir)
}
ret
} else {
Expand All @@ -161,8 +160,8 @@ private[hive] class HiveClientImpl(

// Log the default warehouse location.
logInfo(
s"Warehouse location for Hive client " +
s"(version ${version.fullVersion}) is ${conf.getVar(ConfVars.METASTOREWAREHOUSE)}")
s"Warehouse location for Hive client (version ${version.fullVersion}) is " +
s"${conf.getVar(HiveConf.getConfVars("hive.metastore.warehouse.dir"))}")

private def newState(): SessionState = {
val hiveConf = newHiveConf(sparkConf, hadoopConf, extraConfig, Some(initClassLoader))
Expand Down Expand Up @@ -192,8 +191,8 @@ private[hive] class HiveClientImpl(
// bin/spark-shell, bin/spark-sql and sbin/start-thriftserver.sh to automatically create the
// Derby Metastore when running Spark in the non-production environment.
val isEmbeddedMetaStore = {
val msUri = hiveConf.getVar(ConfVars.METASTOREURIS)
val msConnUrl = hiveConf.getVar(ConfVars.METASTORECONNECTURLKEY)
val msUri = hiveConf.getVar(HiveConf.getConfVars("hive.metastore.uris"))
val msConnUrl = hiveConf.getVar(HiveConf.getConfVars("javax.jdo.option.ConnectionURL"))
(msUri == null || msUri.trim().isEmpty) &&
(msConnUrl != null && msConnUrl.startsWith("jdbc:derby"))
}
Expand All @@ -211,7 +210,7 @@ private[hive] class HiveClientImpl(
}

// We use hive's conf for compatibility.
private val retryLimit = conf.getIntVar(HiveConf.ConfVars.METASTORETHRIFTFAILURERETRIES)
private val retryLimit = conf.getIntVar(HiveConf.getConfVars("hive.metastore.failure.retries"))
private val retryDelayMillis = shim.getMetastoreClientConnectRetryDelayMillis(conf)

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import scala.util.Try

import org.apache.commons.io.{FileUtils, IOUtils}
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.hive.conf.HiveConf.ConfVars
import org.apache.hadoop.hive.shims.ShimLoader

import org.apache.spark.SparkConf
Expand Down Expand Up @@ -291,7 +290,7 @@ private[hive] class IsolatedClientLoader(

/** The isolated client interface to Hive. */
private[hive] def createClient(): HiveClient = synchronized {
val warehouseDir = Option(hadoopConf.get(ConfVars.METASTOREWAREHOUSE.varname))
val warehouseDir = Option(hadoopConf.get("hive.metastore.warehouse.dir"))
if (!isolationOn) {
return new HiveClientImpl(version, warehouseDir, sparkConf, hadoopConf, config,
baseClassLoader, this)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ import com.esotericsoftware.kryo.io.Output
import org.apache.commons.codec.binary.Base64
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileStatus, Path}
import org.apache.hadoop.hive.conf.HiveConf.ConfVars
import org.apache.hadoop.hive.ql.io.orc._
import org.apache.hadoop.hive.ql.io.sarg.SearchArgument
import org.apache.hadoop.hive.serde2.objectinspector
Expand Down Expand Up @@ -140,7 +139,7 @@ class OrcFileFormat extends FileFormat with DataSourceRegister with Serializable
// Sets pushed predicates
OrcFilters.createFilter(requiredSchema, filters).foreach { f =>
hadoopConf.set(OrcFileFormat.SARG_PUSHDOWN, toKryo(f))
hadoopConf.setBoolean(ConfVars.HIVEOPTINDEXFILTER.varname, true)
hadoopConf.setBoolean("hive.optimize.index.filter", true)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@

package org.apache.spark.sql.hive

import org.apache.hadoop.hive.conf.HiveConf.ConfVars

import org.apache.spark.sql._
import org.apache.spark.sql.hive.test.TestHiveSingleton

Expand All @@ -42,11 +40,11 @@ class HiveSessionStateSuite extends SessionStateSuite with TestHiveSingleton {
test("Clone then newSession") {
val sparkSession = hiveContext.sparkSession
val conf = sparkSession.sparkContext.hadoopConfiguration
val oldValue = conf.get(ConfVars.METASTORECONNECTURLKEY.varname)
val oldValue = conf.get("javax.jdo.option.ConnectionURL")
sparkSession.cloneSession()
sparkSession.sharedState.externalCatalog.unwrapped.asInstanceOf[HiveExternalCatalog]
.client.newSession()
val newValue = conf.get(ConfVars.METASTORECONNECTURLKEY.varname)
val newValue = conf.get("javax.jdo.option.ConnectionURL")
assert(oldValue == newValue,
"cloneSession and then newSession should not affect the Derby directory")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ package org.apache.spark.sql.hive

import org.apache.hadoop.fs.Path
import org.apache.hadoop.hive.common.FileUtils
import org.apache.hadoop.hive.conf.HiveConf.ConfVars

import org.apache.spark.{SparkConf, SparkContext, SparkFunSuite}
import org.apache.spark.internal.config.UI
Expand Down Expand Up @@ -50,9 +49,9 @@ class HiveSharedStateSuite extends SparkFunSuite {
// cross sessions.
val initialConfigs = Map("spark.foo" -> "bar",
WAREHOUSE_PATH.key -> warehousePath,
ConfVars.METASTOREWAREHOUSE.varname -> warehousePath,
"hive.metastore.warehouse.dir" -> warehousePath,
CATALOG_IMPLEMENTATION.key -> "hive",
ConfVars.METASTORECONNECTURLKEY.varname ->
"javax.jdo.option.ConnectionURL" ->
s"jdbc:derby:;databaseName=$metastorePath/metastore_db;create=true",
GLOBAL_TEMP_DATABASE.key -> tmpDb)

Expand All @@ -64,7 +63,7 @@ class HiveSharedStateSuite extends SparkFunSuite {
FileUtils.makeQualified(new Path(warehousePath), sc.hadoopConfiguration).toString
assert(sc.conf.get(WAREHOUSE_PATH.key) === qualifiedWHPath,
"initial warehouse conf in session options can affect application wide spark conf")
assert(sc.hadoopConfiguration.get(ConfVars.METASTOREWAREHOUSE.varname) === qualifiedWHPath,
assert(sc.hadoopConfiguration.get("hive.metastore.warehouse.dir") === qualifiedWHPath,
"initial warehouse conf in session options can affect application wide hadoop conf")

assert(!state.sparkContext.conf.contains("spark.foo"),
Expand All @@ -74,7 +73,7 @@ class HiveSharedStateSuite extends SparkFunSuite {
val client = state.externalCatalog.unwrapped.asInstanceOf[HiveExternalCatalog].client
assert(client.getConf("spark.foo", "") === "bar",
"session level conf should be passed to catalog")
assert(client.getConf(ConfVars.METASTOREWAREHOUSE.varname, "") === qualifiedWHPath,
assert(client.getConf("hive.metastore.warehouse.dir", "") === qualifiedWHPath,
"session level conf should be passed to catalog")

assert(state.globalTempViewManager.database === tmpDb)
Expand All @@ -84,7 +83,7 @@ class HiveSharedStateSuite extends SparkFunSuite {

assert(!ss2.sparkContext.conf.get(WAREHOUSE_PATH.key).contains(invalidPath),
"warehouse conf in session options can't affect application wide spark conf")
assert(ss2.sparkContext.hadoopConfiguration.get(ConfVars.METASTOREWAREHOUSE.varname) !==
assert(ss2.sparkContext.hadoopConfiguration.get("hive.metastore.warehouse.dir") !==
invalidPath, "warehouse conf in session options can't affect application wide hadoop conf")
assert(ss.conf.get("spark.foo") === "bar2222", "session level conf should be passed to catalog")
assert(!ss.conf.get(WAREHOUSE_PATH).contains(invalidPath),
Expand Down
Loading