Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
[SPARK-45265][SQL] Supporting Hive 4.0 Metastore
  • Loading branch information
attilapiros authored and dongjoon-hyun committed Apr 2, 2024
commit d80aaf825bb97f27f6be0dc7eecddaf912c7c46a
2 changes: 1 addition & 1 deletion docs/sql-data-sources-hive-tables.md
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ The following options can be used to configure the version of Hive that is used
<td><code>2.3.9</code></td>
<td>
Version of the Hive metastore. Available
options are <code>2.0.0</code> through <code>2.3.9</code> and <code>3.0.0</code> through <code>3.1.3</code>.
options are <code>2.0.0</code> through <code>2.3.9</code>, <code>3.0.0</code> through <code>3.1.3</code>, and <code>4.0.0</code>.
</td>
<td>1.4.0</td>
</tr>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,9 @@ private[spark] object HiveUtils extends Logging {

val HIVE_METASTORE_VERSION = buildStaticConf("spark.sql.hive.metastore.version")
.doc("Version of the Hive metastore. Available options are " +
"<code>2.0.0</code> through <code>2.3.9</code> and " +
"<code>3.0.0</code> through <code>3.1.3</code>.")
"<code>2.0.0</code> through <code>2.3.9</code>, " +
"<code>3.0.0</code> through <code>3.1.3</code> and " +
"<code>4.0.0</code>.")
.version("1.4.0")
.stringConf
.checkValue(isCompatibleHiveVersion, "Unsupported Hive Metastore version")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@ private[hive] class HiveClientImpl(
case hive.v2_3 => new Shim_v2_3()
case hive.v3_0 => new Shim_v3_0()
case hive.v3_1 => new Shim_v3_1()
case hive.v4_0 => new Shim_v4_0()
}

// Create an internal session state for this HiveClientImpl.
Expand Down Expand Up @@ -175,8 +176,26 @@ private[hive] class HiveClientImpl(
// got changed. We reset it to clientLoader.ClassLoader here.
state.getConf.setClassLoader(clientLoader.classLoader)
shim.setCurrentSessionState(state)
state.out = new PrintStream(outputBuffer, true, UTF_8.name())
state.err = new PrintStream(outputBuffer, true, UTF_8.name())
// Hive 4.0 uses org.apache.hadoop.hive.common.io.SessionStream instead of PrintStream
// (see HIVE-21033). For creating a new SessionStream instance reflection must be used
// as this class also was introduced by the same change (HIVE-21033)
// and it is not available in any eariler Hive version
if (Utils.classIsLoadable("org.apache.hadoop.hive.common.io.SessionStream")) {
val sessionStreamCtor = Utils
.classForName[Any]("org.apache.hadoop.hive.common.io.SessionStream")
.getConstructor(classOf[java.io.OutputStream])
val sessionStateClass =
Utils.classForName[SessionState](classOf[SessionState].getName)
val outField = sessionStateClass.getDeclaredField("out")
outField.set(state,
sessionStreamCtor.newInstance(new PrintStream(outputBuffer, true, UTF_8.name())))
val errField = sessionStateClass.getDeclaredField("err")
errField.set(state,
sessionStreamCtor.newInstance(new PrintStream(outputBuffer, true, UTF_8.name())))
} else {
state.out = new PrintStream(outputBuffer, true, UTF_8.name())
state.err = new PrintStream(outputBuffer, true, UTF_8.name())
}
state
}

Expand Down Expand Up @@ -857,7 +876,7 @@ private[hive] class HiveClientImpl(
// Since HIVE-18238(Hive 3.0.0), the Driver.close function's return type changed
// and the CommandProcessorFactory.clean function removed.
driver.getClass.getMethod("close").invoke(driver)
if (version != hive.v3_0 && version != hive.v3_1) {
if (version != hive.v3_0 && version != hive.v3_1 && version != hive.v4_0) {
CommandProcessorFactory.clean(conf)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -256,6 +256,9 @@ private[client] class Shim_v2_0 extends Shim with Logging {
protected lazy val throwExceptionInDropIndex = JBoolean.TRUE
// txnId can be 0 unless isAcid == true
protected lazy val txnIdInLoadDynamicPartitions: JLong = 0L
// Hive CHAR and VARCHAR are treated as catalyst strings but they cannot be pushed down
// before HIVE-26661 (which was introduced in Hive version 4.0)
protected val charVarcharPartionKeyPushDownSupported = false

override def getMSC(hive: Hive): IMetaStoreClient = hive.getMSC

Expand Down Expand Up @@ -723,15 +726,15 @@ private[client] class Shim_v2_0 extends Shim with Logging {
}

object SupportedAttribute {
// hive varchar is treated as catalyst string, but hive varchar can't be pushed down.
private val varcharKeys = table.getPartitionKeys.asScala
private lazy val charVarcharKeys = table.getPartitionKeys.asScala
.filter(col => col.getType.startsWith(serdeConstants.VARCHAR_TYPE_NAME) ||
col.getType.startsWith(serdeConstants.CHAR_TYPE_NAME))
.map(col => col.getName).toSet

def unapply(attr: Attribute): Option[String] = {
val resolver = SQLConf.get.resolver
if (varcharKeys.exists(c => resolver(c, attr.name))) {
if (!charVarcharPartionKeyPushDownSupported &&
charVarcharKeys.exists(c => resolver(c, attr.name))) {
None
} else if (attr.dataType.isInstanceOf[IntegralType] || attr.dataType == StringType ||
attr.dataType == DateType) {
Expand Down Expand Up @@ -1277,3 +1280,7 @@ private[client] class Shim_v3_0 extends Shim_v2_3 {
}

private[client] class Shim_v3_1 extends Shim_v3_0

private[client] class Shim_v4_0 extends Shim_v3_1 {
override protected val charVarcharPartionKeyPushDownSupported = true
}
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ private[hive] object IsolatedClientLoader extends Logging {
case (2, 3, _) => Some(hive.v2_3)
case (3, 0, _) => Some(hive.v3_0)
case (3, 1, _) => Some(hive.v3_1)
case (4, 0, _) => Some(hive.v4_0)
case _ => None
}.getOrElse {
throw QueryExecutionErrors.unsupportedHiveMetastoreVersionError(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,8 +88,14 @@ package object client {
"org.pentaho:pentaho-aggdesigner-algorithm",
"org.apache.hive:hive-vector-code-gen"))

case object v4_0 extends HiveVersion("4.0.0",
extraDeps = Seq("org.apache.derby:derby:10.14.2.0"),
exclusions = Seq("org.apache.calcite:calcite-druid",
"org.apache.curator:*",
"org.apache.hive:hive-vector-code-gen"))

val allSupportedHiveVersions: Set[HiveVersion] =
Set(v2_0, v2_1, v2_2, v2_3, v3_0, v3_1)
Set(v2_0, v2_1, v2_2, v2_3, v3_0, v3_1, v4_0)
}
// scalastyle:on

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -558,7 +558,7 @@ class HiveClientSuite(version: String) extends HiveVersionSuite(version) {

test("sql create index and reset") {
// HIVE-18448 Since Hive 3.0, INDEX is not supported.
if (version != "3.0" && version != "3.1") {
if (version != "3.0" && version != "3.1" && version != "4.0") {
client.runSqlHive("CREATE TABLE indexed_table (key INT)")
client.runSqlHive("CREATE INDEX index_1 ON TABLE indexed_table(key) " +
"as 'COMPACT' WITH DEFERRED REBUILD")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,6 @@ private[client] trait HiveClientVersions {
protected val versions = if (testVersions.nonEmpty) {
testVersions.get.split(",").map(_.trim).filter(_.nonEmpty).toIndexedSeq
} else {
IndexedSeq("2.0", "2.1", "2.2", "2.3", "3.0", "3.1")
IndexedSeq("2.0", "2.1", "2.2", "2.3", "3.0", "3.1", "4.0")
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,13 @@ private[client] abstract class HiveVersionSuite(version: String) extends SparkFu
// hive.metastore.schema.verification from false to true since 2.0
// For details, see the JIRA HIVE-6113 and HIVE-12463
if (version == "2.0" || version == "2.1" || version == "2.2" || version == "2.3" ||
version == "3.0" || version == "3.1") {
version == "3.0" || version == "3.1" || version == "4.0") {
hadoopConf.set("datanucleus.schema.autoCreateAll", "true")
hadoopConf.set("datanucleus.autoStartMechanismMode", "ignored")
hadoopConf.set("hive.metastore.schema.verification", "false")
}
// Since Hive 3.0, HIVE-19310 skipped `ensureDbInit` if `hive.in.test=false`.
if (version == "3.0" || version == "3.1") {
if (version == "3.0" || version == "3.1" || version == "4.0") {
hadoopConf.set("hive.in.test", "true")
hadoopConf.set("hive.query.reexecution.enabled", "false")
}
Expand Down