Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs/sql-data-sources-hive-tables.md
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ The following options can be used to configure the version of Hive that is used
<td><code>2.3.9</code></td>
<td>
Version of the Hive metastore. Available
options are <code>0.12.0</code> through <code>2.3.9</code> and <code>3.0.0</code> through <code>3.1.3</code>.
options are <code>0.12.0</code> through <code>2.3.9</code> and <code>3.0.0</code> through <code>4.0.0</code>.
</td>
<td>1.4.0</td>
</tr>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@ private[hive] class HiveClientImpl(
case hive.v2_3 => new Shim_v2_3()
case hive.v3_0 => new Shim_v3_0()
case hive.v3_1 => new Shim_v3_1()
case hive.v4_0 => new Shim_v4_0()
}

// Create an internal session state for this HiveClientImpl.
Expand Down Expand Up @@ -176,8 +177,26 @@ private[hive] class HiveClientImpl(
// got changed. We reset it to clientLoader.ClassLoader here.
state.getConf.setClassLoader(clientLoader.classLoader)
shim.setCurrentSessionState(state)
state.out = new PrintStream(outputBuffer, true, UTF_8.name())
state.err = new PrintStream(outputBuffer, true, UTF_8.name())
// Hive 4.0 uses org.apache.hadoop.hive.common.io.SessionStream instead of PrintStream
// (see HIVE-21033). For creating a new SessionStream instance reflection must be used
// as this class also was introduced by the same change (HIVE-21033)
// and it is not available in any eariler Hive version
if (Utils.classIsLoadable("org.apache.hadoop.hive.common.io.SessionStream")) {
val sessionStreamCtor = Utils
.classForName[Any]("org.apache.hadoop.hive.common.io.SessionStream")
.getConstructor(classOf[java.io.OutputStream])
val sessionStateClass =
Utils.classForName[SessionState](classOf[SessionState].getName)
val outField = sessionStateClass.getDeclaredField("out")
outField.set(state,
sessionStreamCtor.newInstance(new PrintStream(outputBuffer, true, UTF_8.name())))
val errField = sessionStateClass.getDeclaredField("err")
errField.set(state,
sessionStreamCtor.newInstance(new PrintStream(outputBuffer, true, UTF_8.name())))
} else {
state.out = new PrintStream(outputBuffer, true, UTF_8.name())
state.err = new PrintStream(outputBuffer, true, UTF_8.name())
}
state
}

Expand Down Expand Up @@ -858,7 +877,7 @@ private[hive] class HiveClientImpl(
// Since HIVE-18238(Hive 3.0.0), the Driver.close function's return type changed
// and the CommandProcessorFactory.clean function removed.
driver.getClass.getMethod("close").invoke(driver)
if (version != hive.v3_0 && version != hive.v3_1) {
if (version != hive.v3_0 && version != hive.v3_1 && version != hive.v4_0) {
CommandProcessorFactory.clean(conf)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -700,6 +700,10 @@ private class Shim_v0_12 extends Shim with Logging {

private class Shim_v0_13 extends Shim_v0_12 {

// Hive CHAR and VARCHAR are treated as catalyst strings but they cannot be pushed down
// before HIVE-26661 (which was introduced in Hive version 4.0)
protected val charVarcharPartionKeyPushDownSupported = false

private lazy val setCurrentSessionStateMethod =
findStaticMethod(
classOf[SessionState],
Expand Down Expand Up @@ -947,15 +951,15 @@ private class Shim_v0_13 extends Shim_v0_12 {
}

object SupportedAttribute {
// hive varchar is treated as catalyst string, but hive varchar can't be pushed down.
private val varcharKeys = table.getPartitionKeys.asScala
private lazy val charVarcharKeys = table.getPartitionKeys.asScala
.filter(col => col.getType.startsWith(serdeConstants.VARCHAR_TYPE_NAME) ||
col.getType.startsWith(serdeConstants.CHAR_TYPE_NAME))
.map(col => col.getName).toSet

def unapply(attr: Attribute): Option[String] = {
val resolver = SQLConf.get.resolver
if (varcharKeys.exists(c => resolver(c, attr.name))) {
if (!charVarcharPartionKeyPushDownSupported &&
charVarcharKeys.exists(c => resolver(c, attr.name))) {
None
} else if (attr.dataType.isInstanceOf[IntegralType] || attr.dataType == StringType ||
attr.dataType == DateType) {
Expand Down Expand Up @@ -1781,3 +1785,9 @@ private[client] class Shim_v3_0 extends Shim_v2_3 {
}

private[client] class Shim_v3_1 extends Shim_v3_0

private[client] class Shim_v4_0 extends Shim_v3_1 {

override protected val charVarcharPartionKeyPushDownSupported = true

}
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ private[hive] object IsolatedClientLoader extends Logging {
case (2, 3, _) => Some(hive.v2_3)
case (3, 0, _) => Some(hive.v3_0)
case (3, 1, _) => Some(hive.v3_1)
case (4, 0, _) => Some(hive.v4_0)
case _ => None
}.getOrElse {
throw QueryExecutionErrors.unsupportedHiveMetastoreVersionError(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,8 +89,15 @@ package object client {
"org.pentaho:pentaho-aggdesigner-algorithm",
"org.apache.hive:hive-vector-code-gen"))

// Since HIVE-14496, Hive.java uses calcite-core
case object v4_0 extends HiveVersion("4.0.0",
extraDeps = Seq("org.apache.derby:derby:10.14.2.0"),
exclusions = Seq("org.apache.calcite:calcite-druid",
"org.apache.curator:*",
"org.apache.hive:hive-vector-code-gen"))

val allSupportedHiveVersions: Set[HiveVersion] =
Set(v2_0, v2_1, v2_2, v2_3, v3_0, v3_1)
Set(v2_0, v2_1, v2_2, v2_3, v3_0, v3_1, v4_0)
}
// scalastyle:on

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ class HiveClientSuite(version: String, allVersions: Seq[String])
// test alter database location
val tempDatabasePath2 = Utils.createTempDir().toURI
// Hive support altering database location since HIVE-8472.
if (version == "3.0" || version == "3.1") {
if (version == "3.0" || version == "3.1" || version == "4.0") {
client.alterDatabase(database.copy(locationUri = tempDatabasePath2))
val uriInCatalog = client.getDatabase("temporary").locationUri
assert("file" === uriInCatalog.getScheme)
Expand Down Expand Up @@ -653,7 +653,7 @@ class HiveClientSuite(version: String, allVersions: Seq[String])

test("sql create index and reset") {
// HIVE-18448 Since Hive 3.0, INDEX is not supported.
if (version != "3.0" && version != "3.1") {
if (version != "3.0" && version != "3.1" && version != "4.0") {
client.runSqlHive("CREATE TABLE indexed_table (key INT)")
client.runSqlHive("CREATE INDEX index_1 ON TABLE indexed_table(key) " +
"as 'COMPACT' WITH DEFERRED REBUILD")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,6 @@ private[client] trait HiveClientVersions {
protected val versions = if (testVersions.nonEmpty) {
testVersions.get.split(",").map(_.trim).filter(_.nonEmpty).toIndexedSeq
} else {
IndexedSeq("2.0", "2.1", "2.2", "2.3", "3.0", "3.1")
IndexedSeq("2.0", "2.1", "2.2", "2.3", "3.0", "3.1", "4.0")
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,12 @@ private[client] abstract class HiveVersionSuite(version: String) extends SparkFu
// hive.metastore.schema.verification from false to true since 2.0
// For details, see the JIRA HIVE-6113 and HIVE-12463
if (version == "2.0" || version == "2.1" || version == "2.2" || version == "2.3" ||
version == "3.0" || version == "3.1") {
version == "3.0" || version == "3.1" || version == "4.0") {
hadoopConf.set("datanucleus.schema.autoCreateAll", "true")
hadoopConf.set("hive.metastore.schema.verification", "false")
}
// Since Hive 3.0, HIVE-19310 skipped `ensureDbInit` if `hive.in.test=false`.
if (version == "3.0" || version == "3.1") {
if (version == "3.0" || version == "3.1" || version == "4.0") {
hadoopConf.set("hive.in.test", "true")
hadoopConf.set("hive.query.reexecution.enabled", "false")
}
Expand Down