Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
[SPARK-45265][SQL] Support Hive Metastore Server 4.0
  • Loading branch information
yaooqinn committed Nov 11, 2024
commit 1d13c97121c412b499879d8bc271820dc58619fc
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import org.apache.spark.internal.{Logging, MDC}
import org.apache.spark.internal.LogKeys.COMMAND
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.plans.logical.CommandResult
import org.apache.spark.sql.execution.{QueryExecution, SQLExecution}
import org.apache.spark.sql.execution.{QueryExecution, QueryExecutionException, SQLExecution}
import org.apache.spark.sql.execution.HiveResult.hiveResultString
import org.apache.spark.sql.internal.{SQLConf, VariableSubstitution}

Expand Down Expand Up @@ -82,10 +82,10 @@ private[hive] class SparkSQLDriver(val sparkSession: SparkSession = SparkSQLEnv.
} catch {
case st: SparkThrowable =>
logDebug(s"Failed in [$command]", st)
new CommandProcessorResponse(1, ExceptionUtils.getStackTrace(st), st.getSqlState, st)
throw new QueryExecutionException(ExceptionUtils.getStackTrace(st))
case cause: Throwable =>
logError(log"Failed in [${MDC(COMMAND, command)}]", cause)
new CommandProcessorResponse(1, ExceptionUtils.getStackTrace(cause), null, cause)
throw new QueryExecutionException(ExceptionUtils.getStackTrace(cause))
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1030,7 +1030,7 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
}
val metaStoreParts = partsWithLocation
.map(p => p.copy(spec = toMetaStorePartitionSpec(p.spec)))
client.createPartitions(db, table, metaStoreParts, ignoreIfExists)
client.createPartitions(tableMeta, metaStoreParts, ignoreIfExists)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why?

}

override def dropPartitions(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,9 @@ private[spark] object HiveUtils extends Logging {

val HIVE_METASTORE_VERSION = buildStaticConf("spark.sql.hive.metastore.version")
.doc("Version of the Hive metastore. Available options are " +
"<code>2.0.0</code> through <code>2.3.10</code> and " +
"<code>3.0.0</code> through <code>3.1.3</code>.")
"<code>2.0.0</code> through <code>2.3.10</code>, " +
"<code>3.0.0</code> through <code>3.1.3</code> and " +
"<code>4.0.0</code> through <code>4.0.1</code>.")
.version("1.4.0")
.stringConf
.checkValue(isCompatibleHiveVersion, "Unsupported Hive Metastore version")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -164,8 +164,7 @@ private[hive] trait HiveClient {
* Create one or many partitions in the given table.
*/
def createPartitions(
db: String,
table: String,
table: CatalogTable,
parts: Seq[CatalogTablePartition],
ignoreIfExists: Boolean): Unit

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.spark.sql.hive.client

import java.io.PrintStream
import java.io.{OutputStream, PrintStream}
import java.lang.{Iterable => JIterable}
import java.lang.reflect.InvocationTargetException
import java.nio.charset.StandardCharsets.UTF_8
Expand Down Expand Up @@ -121,6 +121,7 @@ private[hive] class HiveClientImpl(
case hive.v2_3 => new Shim_v2_3()
case hive.v3_0 => new Shim_v3_0()
case hive.v3_1 => new Shim_v3_1()
case hive.v4_0 => new Shim_v4_0()
}

// Create an internal session state for this HiveClientImpl.
Expand Down Expand Up @@ -177,8 +178,10 @@ private[hive] class HiveClientImpl(
// got changed. We reset it to clientLoader.ClassLoader here.
state.getConf.setClassLoader(clientLoader.classLoader)
shim.setCurrentSessionState(state)
state.out = new PrintStream(outputBuffer, true, UTF_8.name())
state.err = new PrintStream(outputBuffer, true, UTF_8.name())
val clz = state.getClass.getField("out").getType.asInstanceOf[Class[_ <: PrintStream]]
val ctor = clz.getConstructor(classOf[OutputStream], classOf[Boolean], classOf[String])
state.getClass.getField("out").set(state, ctor.newInstance(outputBuffer, true, UTF_8.name()))

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

build fail : ”the result type of an implicit conversion must be more specific than Object“

state.getClass.getField("err").set(state, ctor.newInstance(outputBuffer, true, UTF_8.name()))
state
}

Expand Down Expand Up @@ -307,15 +310,27 @@ private[hive] class HiveClientImpl(
}

def setOut(stream: PrintStream): Unit = withHiveState {
state.out = stream
val ctor = state.getClass.getField("out")
.getType
.asInstanceOf[Class[_ <: PrintStream]]
.getConstructor(classOf[OutputStream])
state.getClass.getField("out").set(state, ctor.newInstance(stream))
}

def setInfo(stream: PrintStream): Unit = withHiveState {
state.info = stream
val ctor = state.getClass.getField("info")
.getType
.asInstanceOf[Class[_ <: PrintStream]]
.getConstructor(classOf[OutputStream])
state.getClass.getField("info").set(state, ctor.newInstance(stream))
}

def setError(stream: PrintStream): Unit = withHiveState {
state.err = stream
val ctor = state.getClass.getField("err")
.getType
.asInstanceOf[Class[_ <: PrintStream]]
.getConstructor(classOf[OutputStream])
state.getClass.getField("err").set(state, ctor.newInstance(stream))
}

private def setCurrentDatabaseRaw(db: String): Unit = {
Expand Down Expand Up @@ -629,21 +644,22 @@ private[hive] class HiveClientImpl(
}

override def createPartitions(
db: String,
table: String,
table: CatalogTable,
parts: Seq[CatalogTablePartition],
ignoreIfExists: Boolean): Unit = withHiveState {
def replaceExistException(e: Throwable): Unit = e match {
case _: HiveException if e.getCause.isInstanceOf[AlreadyExistsException] =>
val hiveTable = client.getTable(db, table)
val db = table.identifier.database.getOrElse(state.getCurrentDatabase)
val tableName = table.identifier.table
val hiveTable = client.getTable(db, tableName)
val existingParts = parts.filter { p =>
shim.getPartitions(client, hiveTable, p.spec.asJava).nonEmpty
}
throw new PartitionsAlreadyExistException(db, table, existingParts.map(_.spec))
throw new PartitionsAlreadyExistException(db, tableName, existingParts.map(_.spec))
case _ => throw e
}
try {
shim.createPartitions(client, db, table, parts, ignoreIfExists)
shim.createPartitions(client, toHiveTable(table), parts, ignoreIfExists)
} catch {
case e: InvocationTargetException => replaceExistException(e.getCause)
case e: Throwable => replaceExistException(e)
Expand Down Expand Up @@ -891,17 +907,14 @@ private[hive] class HiveClientImpl(
results

case _ =>
if (state.out != null) {
val out = state.getClass.getField("out").get(state)
if (out != null) {
// scalastyle:off println
state.out.println(tokens(0) + " " + cmd_1)
out.asInstanceOf[PrintStream].println(tokens(0) + " " + cmd_1)
// scalastyle:on println
}
val response: CommandProcessorResponse = proc.run(cmd_1)
// Throw an exception if there is an error in query processing.
if (response.getResponseCode != 0) {
throw new QueryExecutionException(response.getErrorMessage)
}
Seq(response.getResponseCode.toString)
proc.run(cmd_1)
Seq("0")
}
} catch {
case e: Exception =>
Expand Down Expand Up @@ -971,7 +984,7 @@ private[hive] class HiveClientImpl(
partSpec,
replace,
numDP,
listBucketingEnabled = hiveTable.isStoredAsSubDirectories)
hiveTable)
}

override def createFunction(db: String, func: CatalogFunction): Unit = withHiveState {
Expand Down
Loading