diff --git a/docs/sql-data-sources-hive-tables.md b/docs/sql-data-sources-hive-tables.md index b51cde53bd8f..30e62c591618 100644 --- a/docs/sql-data-sources-hive-tables.md +++ b/docs/sql-data-sources-hive-tables.md @@ -130,7 +130,7 @@ The following options can be used to configure the version of Hive that is used
2.3.92.0.0 through 2.3.9 and 3.0.0 through 3.1.3.
+ options are 2.0.0 through 2.3.9, 3.0.0 through 3.1.3, and 4.0.0.
2.0.0 through 2.3.9 and " +
- "3.0.0 through 3.1.3.")
+ "2.0.0 through 2.3.9, " +
+ "3.0.0 through 3.1.3 and " +
+ "4.0.0.")
.version("1.4.0")
.stringConf
.checkValue(isCompatibleHiveVersion, "Unsupported Hive Metastore version")
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
index 46dc56372334..d95a06c08d7d 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
@@ -120,6 +120,7 @@ private[hive] class HiveClientImpl(
case hive.v2_3 => new Shim_v2_3()
case hive.v3_0 => new Shim_v3_0()
case hive.v3_1 => new Shim_v3_1()
+ case hive.v4_0 => new Shim_v4_0()
}
// Create an internal session state for this HiveClientImpl.
@@ -175,8 +176,26 @@ private[hive] class HiveClientImpl(
// got changed. We reset it to clientLoader.ClassLoader here.
state.getConf.setClassLoader(clientLoader.classLoader)
shim.setCurrentSessionState(state)
- state.out = new PrintStream(outputBuffer, true, UTF_8.name())
- state.err = new PrintStream(outputBuffer, true, UTF_8.name())
+ // Hive 4.0 uses org.apache.hadoop.hive.common.io.SessionStream instead of PrintStream
+ // (see HIVE-21033). For creating a new SessionStream instance reflection must be used
+ // as this class also was introduced by the same change (HIVE-21033)
+ // and it is not available in any eariler Hive version
+ if (Utils.classIsLoadable("org.apache.hadoop.hive.common.io.SessionStream")) {
+ val sessionStreamCtor = Utils
+ .classForName[Any]("org.apache.hadoop.hive.common.io.SessionStream")
+ .getConstructor(classOf[java.io.OutputStream])
+ val sessionStateClass =
+ Utils.classForName[SessionState](classOf[SessionState].getName)
+ val outField = sessionStateClass.getDeclaredField("out")
+ outField.set(state,
+ sessionStreamCtor.newInstance(new PrintStream(outputBuffer, true, UTF_8.name())))
+ val errField = sessionStateClass.getDeclaredField("err")
+ errField.set(state,
+ sessionStreamCtor.newInstance(new PrintStream(outputBuffer, true, UTF_8.name())))
+ } else {
+ state.out = new PrintStream(outputBuffer, true, UTF_8.name())
+ state.err = new PrintStream(outputBuffer, true, UTF_8.name())
+ }
state
}
@@ -857,7 +876,7 @@ private[hive] class HiveClientImpl(
// Since HIVE-18238(Hive 3.0.0), the Driver.close function's return type changed
// and the CommandProcessorFactory.clean function removed.
driver.getClass.getMethod("close").invoke(driver)
- if (version != hive.v3_0 && version != hive.v3_1) {
+ if (version != hive.v3_0 && version != hive.v3_1 && version != hive.v4_0) {
CommandProcessorFactory.clean(conf)
}
}
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala
index 9943c0178fcf..0e662bb9a2a8 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala
@@ -256,6 +256,9 @@ private[client] class Shim_v2_0 extends Shim with Logging {
protected lazy val throwExceptionInDropIndex = JBoolean.TRUE
// txnId can be 0 unless isAcid == true
protected lazy val txnIdInLoadDynamicPartitions: JLong = 0L
+ // Hive CHAR and VARCHAR are treated as catalyst strings but they cannot be pushed down
+ // before HIVE-26661 (which was introduced in Hive version 4.0)
+ protected val charVarcharPartionKeyPushDownSupported = false
override def getMSC(hive: Hive): IMetaStoreClient = hive.getMSC
@@ -723,15 +726,15 @@ private[client] class Shim_v2_0 extends Shim with Logging {
}
object SupportedAttribute {
- // hive varchar is treated as catalyst string, but hive varchar can't be pushed down.
- private val varcharKeys = table.getPartitionKeys.asScala
+ private lazy val charVarcharKeys = table.getPartitionKeys.asScala
.filter(col => col.getType.startsWith(serdeConstants.VARCHAR_TYPE_NAME) ||
col.getType.startsWith(serdeConstants.CHAR_TYPE_NAME))
.map(col => col.getName).toSet
def unapply(attr: Attribute): Option[String] = {
val resolver = SQLConf.get.resolver
- if (varcharKeys.exists(c => resolver(c, attr.name))) {
+ if (!charVarcharPartionKeyPushDownSupported &&
+ charVarcharKeys.exists(c => resolver(c, attr.name))) {
None
} else if (attr.dataType.isInstanceOf[IntegralType] || attr.dataType == StringType ||
attr.dataType == DateType) {
@@ -1277,3 +1280,89 @@ private[client] class Shim_v3_0 extends Shim_v2_3 {
}
private[client] class Shim_v3_1 extends Shim_v3_0
+
+private[client] class Shim_v4_0 extends Shim_v3_1 {
+ override protected val charVarcharPartionKeyPushDownSupported = true
+
+ private lazy val clazzLoadFileType = getClass.getClassLoader.loadClass(
+ "org.apache.hadoop.hive.ql.plan.LoadTableDesc$LoadFileType")
+ private lazy val clazzAlterTableAddPartitionDesc = getClass.getClassLoader.loadClass(
+ "org.apache.hadoop.hive.ql.ddl.table.partition.AlterTableAddPartitionDesc")
+
+ private lazy val alterTableMethod =
+ findMethod(
+ classOf[Hive],
+ "alterTable",
+ classOf[String],
+ classOf[Table],
+ classOf[EnvironmentContext],
+ JBoolean.TYPE)
+ private lazy val loadTableMethod =
+ findMethod(
+ classOf[Hive],
+ "loadTable",
+ classOf[Path],
+ classOf[String],
+ clazzLoadFileType,
+ JBoolean.TYPE,
+ JBoolean.TYPE,
+ JBoolean.TYPE,
+ JBoolean.TYPE,
+ classOf[JLong],
+ JInteger.TYPE,
+ JBoolean.TYPE,
+ JBoolean.TYPE)
+
+ // HIVE-19975 changed alterTable by adding transactional
+ override def alterTable(hive: Hive, tableName: String, table: Table): Unit = {
+ recordHiveCall()
+ val transactional = false
+ alterTableMethod.invoke(hive, tableName, table, environmentContextInAlterTable,
+ transactional: JBoolean)
+ }
+
+ // HIVE-21078 changed loadTable by removing hasFollowingStatsTask and adding resetStatistics
+ // HIVE-21164 changed loadTable by adding isDirectInsert
+ override def loadTable(
+ hive: Hive,
+ loadPath: Path,
+ tableName: String,
+ replace: Boolean,
+ isSrcLocal: Boolean): Unit = {
+ val loadFileType = if (replace) {
+ clazzLoadFileType.getEnumConstants.find(_.toString.equalsIgnoreCase("REPLACE_ALL"))
+ } else {
+ clazzLoadFileType.getEnumConstants.find(_.toString.equalsIgnoreCase("KEEP_EXISTING"))
+ }
+ assert(loadFileType.isDefined)
+ recordHiveCall()
+ val resetStatistics = false
+ val isDirectInsert = false
+ loadTableMethod.invoke(hive, loadPath, tableName, loadFileType.get, isSrcLocal: JBoolean,
+ isSkewedStoreAsSubdir, isAcidIUDoperation, resetStatistics,
+ writeIdInLoadTableOrPartition, stmtIdInLoadTableOrPartition: JInteger, replace: JBoolean,
+ isDirectInsert: JBoolean)
+ }
+
+ // HIVE-21703 renamed AddPartitionDesc to AlterTableAddPartitionDesc
+ override def createPartitions(
+ hive: Hive,
+ dbName: String,
+ tableName: String,
+ parts: Seq[CatalogTablePartition],
+ ignoreIfExists: Boolean): Unit = {
+ /*
+ clazzAlterTableAddPartitionDesc.getConstructor().newInstance()
+ val addPartitionDesc = new AlterTableAddPartitionDesc(dbName, tableName, ignoreIfExists)
+ parts.zipWithIndex.foreach { case (s, i) =>
+ addPartitionDesc.addPartition(
+ s.spec.asJava, s.storage.locationUri.map(CatalogUtils.URIToString).orNull)
+ if (s.parameters.nonEmpty) {
+ addPartitionDesc.getPartition(i).setPartParams(s.parameters.asJava)
+ }
+ }
+ recordHiveCall()
+ hive.createPartitions(addPartitionDesc)
+ */
+ }
+}
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala
index 5297910cbfa4..669fde6a72bf 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala
@@ -95,6 +95,7 @@ private[hive] object IsolatedClientLoader extends Logging {
case (2, 3, _) => Some(hive.v2_3)
case (3, 0, _) => Some(hive.v3_0)
case (3, 1, _) => Some(hive.v3_1)
+ case (4, 0, _) => Some(hive.v4_0)
case _ => None
}.getOrElse {
throw QueryExecutionErrors.unsupportedHiveMetastoreVersionError(
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/package.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/package.scala
index 564c87a0fca8..d84e2d674580 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/package.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/package.scala
@@ -88,8 +88,22 @@ package object client {
"org.pentaho:pentaho-aggdesigner-algorithm",
"org.apache.hive:hive-vector-code-gen"))
+ // Since Hive 4.0, hadoop-hdfs, datanucleus-api-jdo and springframework is required.
+ case object v4_0 extends HiveVersion("4.0.0",
+ extraDeps = Seq("org.apache.hadoop:hadoop-hdfs:3.3.6",
+ "org.datanucleus:datanucleus-api-jdo:5.2.8",
+ "org.datanucleus:datanucleus-rdbms:5.2.10",
+ "org.datanucleus:javax.jdo:3.2.0-release",
+ "org.springframework:spring-core:5.3.21",
+ "org.springframework:spring-jdbc:5.3.21",
+ "org.antlr:antlr4-runtime:4.9.3",
+ "org.apache.derby:derby:10.14.2.0"),
+ exclusions = Seq("org.apache.calcite:calcite-druid",
+ "org.apache.curator:*",
+ "org.apache.hive:hive-vector-code-gen"))
+
val allSupportedHiveVersions: Set[HiveVersion] =
- Set(v2_0, v2_1, v2_2, v2_3, v3_0, v3_1)
+ Set(v2_0, v2_1, v2_2, v2_3, v3_0, v3_1, v4_0)
}
// scalastyle:on
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HiveClientSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HiveClientSuite.scala
index 0bc288501a01..9ae613f86622 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HiveClientSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HiveClientSuite.scala
@@ -163,7 +163,7 @@ class HiveClientSuite(version: String) extends HiveVersionSuite(version) {
// test alter database location
val tempDatabasePath2 = Utils.createTempDir().toURI
// Hive support altering database location since HIVE-8472.
- if (version == "3.0" || version == "3.1") {
+ if (version == "3.0" || version == "3.1" || version == "4.0") {
client.alterDatabase(database.copy(locationUri = tempDatabasePath2))
val uriInCatalog = client.getDatabase("temporary").locationUri
assert("file" === uriInCatalog.getScheme)
@@ -558,7 +558,7 @@ class HiveClientSuite(version: String) extends HiveVersionSuite(version) {
test("sql create index and reset") {
// HIVE-18448 Since Hive 3.0, INDEX is not supported.
- if (version != "3.0" && version != "3.1") {
+ if (version != "3.0" && version != "3.1" && version != "4.0") {
client.runSqlHive("CREATE TABLE indexed_table (key INT)")
client.runSqlHive("CREATE INDEX index_1 ON TABLE indexed_table(key) " +
"as 'COMPACT' WITH DEFERRED REBUILD")
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HiveClientVersions.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HiveClientVersions.scala
index 1dee9e6dcfc8..061986ef9324 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HiveClientVersions.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HiveClientVersions.scala
@@ -24,6 +24,6 @@ private[client] trait HiveClientVersions {
protected val versions = if (testVersions.nonEmpty) {
testVersions.get.split(",").map(_.trim).filter(_.nonEmpty).toIndexedSeq
} else {
- IndexedSeq("2.0", "2.1", "2.2", "2.3", "3.0", "3.1")
+ IndexedSeq("2.0", "2.1", "2.2", "2.3", "3.0", "3.1", "4.0")
}
}
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HiveVersionSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HiveVersionSuite.scala
index 5db0b4f18c96..00aefe26e035 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HiveVersionSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HiveVersionSuite.scala
@@ -32,13 +32,13 @@ private[client] abstract class HiveVersionSuite(version: String) extends SparkFu
// hive.metastore.schema.verification from false to true since 2.0
// For details, see the JIRA HIVE-6113 and HIVE-12463
if (version == "2.0" || version == "2.1" || version == "2.2" || version == "2.3" ||
- version == "3.0" || version == "3.1") {
+ version == "3.0" || version == "3.1" || version == "4.0") {
hadoopConf.set("datanucleus.schema.autoCreateAll", "true")
hadoopConf.set("datanucleus.autoStartMechanismMode", "ignored")
hadoopConf.set("hive.metastore.schema.verification", "false")
}
// Since Hive 3.0, HIVE-19310 skipped `ensureDbInit` if `hive.in.test=false`.
- if (version == "3.0" || version == "3.1") {
+ if (version == "3.0" || version == "3.1" || version == "4.0") {
hadoopConf.set("hive.in.test", "true")
hadoopConf.set("hive.query.reexecution.enabled", "false")
}