Skip to content

Commit c2343f7

Browse files
yaooqinndongjoon-hyun
authored andcommitted
[SPARK-45265][SQL] Support Hive 4.0 metastore
### What changes were proposed in this pull request? This PR continues the work from #43064 and #45801 to support Hive Metastore Server 4.0. CHAR/VARCHAR type partition filter pushdown is not included in this PR, as it requires further investment. ### Why are the changes needed? Enhance the multiple hive metastore server support feature ### Does this PR introduce _any_ user-facing change? no ### How was this patch tested? Passing HiveClient*Suites w/ 4.0 ### Was this patch authored or co-authored using generative AI tooling? no Closes #48823 from yaooqinn/SPARK-45265. Authored-by: Kent Yao <yao@apache.org> Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
1 parent 0aee601 commit c2343f7

File tree

15 files changed

+413
-87
lines changed

15 files changed

+413
-87
lines changed

docs/sql-data-sources-hive-tables.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -130,7 +130,7 @@ The following options can be used to configure the version of Hive that is used
130130
<td><code>2.3.10</code></td>
131131
<td>
132132
Version of the Hive metastore. Available
133-
options are <code>2.0.0</code> through <code>2.3.10</code> and <code>3.0.0</code> through <code>3.1.3</code>.
133+
options are <code>2.0.0</code> through <code>2.3.10</code>, <code>3.0.0</code> through <code>3.1.3</code>, and <code>4.0.0</code> through <code>4.0.1</code>.
134134
</td>
135135
<td>1.4.0</td>
136136
</tr>

project/SparkBuild.scala

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1183,6 +1183,14 @@ object Hive {
11831183
// Hive tests need higher metaspace size
11841184
(Test / javaOptions) := (Test / javaOptions).value.filterNot(_.contains("MaxMetaspaceSize")),
11851185
(Test / javaOptions) += "-XX:MaxMetaspaceSize=2g",
1186+
// SPARK-45265: HivePartitionFilteringSuite addPartitions related tests generate supper long
1187+
// direct sql against derby server, which may cause stack overflow error when derby do sql
1188+
// parsing.
1189+
// We need to increase the Xss for the test. Meanwhile, QueryParsingErrorsSuite requires a
1190+
// smaller size of Xss to mock a FAILED_TO_PARSE_TOO_COMPLEX error, so we need to set for
1191+
// hive moudle specifically.
1192+
(Test / javaOptions) := (Test / javaOptions).value.filterNot(_.contains("Xss")),
1193+
(Test / javaOptions) += "-Xss64m",
11861194
// Supporting all SerDes requires us to depend on deprecated APIs, so we turn off the warnings
11871195
// only for this subproject.
11881196
scalacOptions := (scalacOptions map { currentOpts: Seq[String] =>

sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala

Lines changed: 20 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -458,27 +458,28 @@ private[hive] class SparkSQLCLIDriver extends CliDriver with Logging {
458458
if (sessionState.getIsVerbose) {
459459
out.println(cmd)
460460
}
461-
val rc = driver.run(cmd)
462-
val endTimeNs = System.nanoTime()
463-
val timeTaken: Double = TimeUnit.NANOSECONDS.toMillis(endTimeNs - startTimeNs) / 1000.0
464-
465-
ret = rc.getResponseCode
466-
if (ret != 0) {
467-
val format = SparkSQLEnv.sparkSession.sessionState.conf.errorMessageFormat
468-
val e = rc.getException
469-
val msg = e match {
470-
case st: SparkThrowable with Throwable => SparkThrowableHelper.getMessage(st, format)
471-
case _ => e.getMessage
472-
}
473-
err.println(msg)
474-
if (format == ErrorMessageFormat.PRETTY &&
461+
try {
462+
driver.run(cmd)
463+
} catch {
464+
case t: Throwable =>
465+
ret = 1
466+
val format = SparkSQLEnv.sparkSession.sessionState.conf.errorMessageFormat
467+
val msg = t match {
468+
case st: SparkThrowable with Throwable =>
469+
SparkThrowableHelper.getMessage(st, format)
470+
case _ => t.getMessage
471+
}
472+
err.println(msg)
473+
if (format == ErrorMessageFormat.PRETTY &&
475474
!sessionState.getIsSilent &&
476-
(!e.isInstanceOf[AnalysisException] || e.getCause != null)) {
477-
e.printStackTrace(err)
478-
}
479-
driver.close()
480-
return ret
475+
(!t.isInstanceOf[AnalysisException] || t.getCause != null)) {
476+
t.printStackTrace(err)
477+
}
478+
driver.close()
479+
return ret
481480
}
481+
val endTimeNs = System.nanoTime()
482+
val timeTaken: Double = TimeUnit.NANOSECONDS.toMillis(endTimeNs - startTimeNs) / 1000.0
482483

483484
val res = new JArrayList[String]()
484485

sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLDriver.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ import org.apache.spark.internal.{Logging, MDC}
3131
import org.apache.spark.internal.LogKeys.COMMAND
3232
import org.apache.spark.sql.SparkSession
3333
import org.apache.spark.sql.catalyst.plans.logical.CommandResult
34-
import org.apache.spark.sql.execution.{QueryExecution, SQLExecution}
34+
import org.apache.spark.sql.execution.{QueryExecution, QueryExecutionException, SQLExecution}
3535
import org.apache.spark.sql.execution.HiveResult.hiveResultString
3636
import org.apache.spark.sql.internal.{SQLConf, VariableSubstitution}
3737

@@ -82,10 +82,10 @@ private[hive] class SparkSQLDriver(val sparkSession: SparkSession = SparkSQLEnv.
8282
} catch {
8383
case st: SparkThrowable =>
8484
logDebug(s"Failed in [$command]", st)
85-
new CommandProcessorResponse(1, ExceptionUtils.getStackTrace(st), st.getSqlState, st)
85+
throw st
8686
case cause: Throwable =>
8787
logError(log"Failed in [${MDC(COMMAND, command)}]", cause)
88-
new CommandProcessorResponse(1, ExceptionUtils.getStackTrace(cause), null, cause)
88+
throw new QueryExecutionException(ExceptionUtils.getStackTrace(cause))
8989
}
9090
}
9191

sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1030,7 +1030,7 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
10301030
}
10311031
val metaStoreParts = partsWithLocation
10321032
.map(p => p.copy(spec = toMetaStorePartitionSpec(p.spec)))
1033-
client.createPartitions(db, table, metaStoreParts, ignoreIfExists)
1033+
client.createPartitions(tableMeta, metaStoreParts, ignoreIfExists)
10341034
}
10351035

10361036
override def dropPartitions(

sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -74,8 +74,9 @@ private[spark] object HiveUtils extends Logging {
7474

7575
val HIVE_METASTORE_VERSION = buildStaticConf("spark.sql.hive.metastore.version")
7676
.doc("Version of the Hive metastore. Available options are " +
77-
"<code>2.0.0</code> through <code>2.3.10</code> and " +
78-
"<code>3.0.0</code> through <code>3.1.3</code>.")
77+
"<code>2.0.0</code> through <code>2.3.10</code>, " +
78+
"<code>3.0.0</code> through <code>3.1.3</code> and " +
79+
"<code>4.0.0</code> through <code>4.0.1</code>.")
7980
.version("1.4.0")
8081
.stringConf
8182
.checkValue(isCompatibleHiveVersion, "Unsupported Hive Metastore version")

sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -164,8 +164,7 @@ private[hive] trait HiveClient {
164164
* Create one or many partitions in the given table.
165165
*/
166166
def createPartitions(
167-
db: String,
168-
table: String,
167+
table: CatalogTable,
169168
parts: Seq[CatalogTablePartition],
170169
ignoreIfExists: Boolean): Unit
171170

sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala

Lines changed: 70 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717

1818
package org.apache.spark.sql.hive.client
1919

20-
import java.io.PrintStream
20+
import java.io.{OutputStream, PrintStream}
2121
import java.lang.{Iterable => JIterable}
2222
import java.lang.reflect.InvocationTargetException
2323
import java.nio.charset.StandardCharsets.UTF_8
@@ -28,6 +28,7 @@ import scala.collection.mutable
2828
import scala.collection.mutable.ArrayBuffer
2929
import scala.jdk.CollectionConverters._
3030

31+
import org.apache.commons.lang3.exception.ExceptionUtils
3132
import org.apache.hadoop.conf.Configuration
3233
import org.apache.hadoop.fs.Path
3334
import org.apache.hadoop.hive.common.StatsSetupConst
@@ -44,7 +45,7 @@ import org.apache.hadoop.hive.serde2.MetadataTypedColumnsetSerDe
4445
import org.apache.hadoop.hive.serde2.`lazy`.LazySimpleSerDe
4546
import org.apache.hadoop.security.UserGroupInformation
4647

47-
import org.apache.spark.{SparkConf, SparkException}
48+
import org.apache.spark.{SparkConf, SparkException, SparkThrowable}
4849
import org.apache.spark.deploy.SparkHadoopUtil.SOURCE_SPARK
4950
import org.apache.spark.internal.{Logging, LogKeys, MDC}
5051
import org.apache.spark.internal.LogKeys._
@@ -121,6 +122,7 @@ private[hive] class HiveClientImpl(
121122
case hive.v2_3 => new Shim_v2_3()
122123
case hive.v3_0 => new Shim_v3_0()
123124
case hive.v3_1 => new Shim_v3_1()
125+
case hive.v4_0 => new Shim_v4_0()
124126
}
125127

126128
// Create an internal session state for this HiveClientImpl.
@@ -177,8 +179,10 @@ private[hive] class HiveClientImpl(
177179
// got changed. We reset it to clientLoader.ClassLoader here.
178180
state.getConf.setClassLoader(clientLoader.classLoader)
179181
shim.setCurrentSessionState(state)
180-
state.out = new PrintStream(outputBuffer, true, UTF_8.name())
181-
state.err = new PrintStream(outputBuffer, true, UTF_8.name())
182+
val clz = state.getClass.getField("out").getType.asInstanceOf[Class[_ <: PrintStream]]
183+
val ctor = clz.getConstructor(classOf[OutputStream], classOf[Boolean], classOf[String])
184+
state.getClass.getField("out").set(state, ctor.newInstance(outputBuffer, true, UTF_8.name()))
185+
state.getClass.getField("err").set(state, ctor.newInstance(outputBuffer, true, UTF_8.name()))
182186
state
183187
}
184188

@@ -307,15 +311,27 @@ private[hive] class HiveClientImpl(
307311
}
308312

309313
def setOut(stream: PrintStream): Unit = withHiveState {
310-
state.out = stream
314+
val ctor = state.getClass.getField("out")
315+
.getType
316+
.asInstanceOf[Class[_ <: PrintStream]]
317+
.getConstructor(classOf[OutputStream])
318+
state.getClass.getField("out").set(state, ctor.newInstance(stream))
311319
}
312320

313321
def setInfo(stream: PrintStream): Unit = withHiveState {
314-
state.info = stream
322+
val ctor = state.getClass.getField("info")
323+
.getType
324+
.asInstanceOf[Class[_ <: PrintStream]]
325+
.getConstructor(classOf[OutputStream])
326+
state.getClass.getField("info").set(state, ctor.newInstance(stream))
315327
}
316328

317329
def setError(stream: PrintStream): Unit = withHiveState {
318-
state.err = stream
330+
val ctor = state.getClass.getField("err")
331+
.getType
332+
.asInstanceOf[Class[_ <: PrintStream]]
333+
.getConstructor(classOf[OutputStream])
334+
state.getClass.getField("err").set(state, ctor.newInstance(stream))
319335
}
320336

321337
private def setCurrentDatabaseRaw(db: String): Unit = {
@@ -629,21 +645,22 @@ private[hive] class HiveClientImpl(
629645
}
630646

631647
override def createPartitions(
632-
db: String,
633-
table: String,
648+
table: CatalogTable,
634649
parts: Seq[CatalogTablePartition],
635650
ignoreIfExists: Boolean): Unit = withHiveState {
636651
def replaceExistException(e: Throwable): Unit = e match {
637652
case _: HiveException if e.getCause.isInstanceOf[AlreadyExistsException] =>
638-
val hiveTable = client.getTable(db, table)
653+
val db = table.identifier.database.getOrElse(state.getCurrentDatabase)
654+
val tableName = table.identifier.table
655+
val hiveTable = client.getTable(db, tableName)
639656
val existingParts = parts.filter { p =>
640657
shim.getPartitions(client, hiveTable, p.spec.asJava).nonEmpty
641658
}
642-
throw new PartitionsAlreadyExistException(db, table, existingParts.map(_.spec))
659+
throw new PartitionsAlreadyExistException(db, tableName, existingParts.map(_.spec))
643660
case _ => throw e
644661
}
645662
try {
646-
shim.createPartitions(client, db, table, parts, ignoreIfExists)
663+
shim.createPartitions(client, toHiveTable(table), parts, ignoreIfExists)
647664
} catch {
648665
case e: InvocationTargetException => replaceExistException(e.getCause)
649666
case e: Throwable => replaceExistException(e)
@@ -861,11 +878,22 @@ private[hive] class HiveClientImpl(
861878
// Since HIVE-18238(Hive 3.0.0), the Driver.close function's return type changed
862879
// and the CommandProcessorFactory.clean function removed.
863880
driver.getClass.getMethod("close").invoke(driver)
864-
if (version != hive.v3_0 && version != hive.v3_1) {
881+
if (version != hive.v3_0 && version != hive.v3_1 && version != hive.v4_0) {
865882
CommandProcessorFactory.clean(conf)
866883
}
867884
}
868885

886+
def getResponseCode(response: CommandProcessorResponse): Int = {
887+
if (version < hive.v4_0) {
888+
response.getResponseCode
889+
} else {
890+
// Since Hive 4.0, response code is removed from CommandProcessorResponse.
891+
// Here we simply return 0 for the positive cases as for error cases it will
892+
// throw exceptions early.
893+
0
894+
}
895+
}
896+
869897
// Hive query needs to start SessionState.
870898
SessionState.start(state)
871899
logDebug(s"Running hiveql '$cmd'")
@@ -878,30 +906,44 @@ private[hive] class HiveClientImpl(
878906
val proc = shim.getCommandProcessor(tokens(0), conf)
879907
proc match {
880908
case driver: Driver =>
881-
val response: CommandProcessorResponse = driver.run(cmd)
882-
// Throw an exception if there is an error in query processing.
883-
if (response.getResponseCode != 0) {
909+
try {
910+
val response: CommandProcessorResponse = driver.run(cmd)
911+
if (getResponseCode(response) != 0) {
912+
// Throw an exception if there is an error in query processing.
913+
// This works for hive 3.x and earlier versions.
914+
throw new QueryExecutionException(response.getErrorMessage)
915+
}
916+
driver.setMaxRows(maxRows)
917+
val results = shim.getDriverResults(driver)
918+
results
919+
} catch {
920+
case e @ (_: QueryExecutionException | _: SparkThrowable) =>
921+
throw e
922+
case e: Exception =>
923+
// Wrap the original hive error with QueryExecutionException and throw it
924+
// if there is an error in query processing.
925+
// This works for hive 4.x and later versions.
926+
throw new QueryExecutionException(ExceptionUtils.getStackTrace(e))
927+
} finally {
884928
closeDriver(driver)
885-
throw new QueryExecutionException(response.getErrorMessage)
886929
}
887-
driver.setMaxRows(maxRows)
888-
889-
val results = shim.getDriverResults(driver)
890-
closeDriver(driver)
891-
results
892930

893931
case _ =>
894-
if (state.out != null) {
932+
val out = state.getClass.getField("out").get(state)
933+
if (out != null) {
895934
// scalastyle:off println
896-
state.out.println(tokens(0) + " " + cmd_1)
935+
out.asInstanceOf[PrintStream].println(tokens(0) + " " + cmd_1)
897936
// scalastyle:on println
898937
}
899938
val response: CommandProcessorResponse = proc.run(cmd_1)
900-
// Throw an exception if there is an error in query processing.
901-
if (response.getResponseCode != 0) {
939+
val responseCode = getResponseCode(response)
940+
if (responseCode != 0) {
941+
// Throw an exception if there is an error in query processing.
942+
// This works for hive 3.x and earlier versions. For 4.x and later versions,
943+
// It will go to the catch block directly.
902944
throw new QueryExecutionException(response.getErrorMessage)
903945
}
904-
Seq(response.getResponseCode.toString)
946+
Seq(responseCode.toString)
905947
}
906948
} catch {
907949
case e: Exception =>
@@ -971,7 +1013,7 @@ private[hive] class HiveClientImpl(
9711013
partSpec,
9721014
replace,
9731015
numDP,
974-
listBucketingEnabled = hiveTable.isStoredAsSubDirectories)
1016+
hiveTable)
9751017
}
9761018

9771019
override def createFunction(db: String, func: CatalogFunction): Unit = withHiveState {

0 commit comments

Comments
 (0)