Skip to content

Commit 9bd39df

Browse files
wangyumtkakantousis
authored andcommitted
[SPARK-23510][SQL] Support Hive 2.2 and Hive 2.3 metastore
## What changes were proposed in this pull request? This is based on apache#20668 for supporting Hive 2.2 and Hive 2.3 metastore. When we merge the PR, we should give the major credit to wangyum ## How was this patch tested? Added the test cases Author: Yuming Wang <yumwang@ebay.com> Author: gatorsmile <gatorsmile@gmail.com> Closes apache#20671 from gatorsmile/pr-20668.
1 parent 3982092 commit 9bd39df

File tree

9 files changed

+72
-12
lines changed

9 files changed

+72
-12
lines changed

sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@ private[spark] object HiveUtils extends Logging {
6262

6363
val HIVE_METASTORE_VERSION = buildConf("spark.sql.hive.metastore.version")
6464
.doc("Version of the Hive metastore. Available options are " +
65-
s"<code>0.12.0</code> through <code>2.1.1</code>.")
65+
s"<code>0.12.0</code> through <code>2.3.2</code>.")
6666
.stringConf
6767
.createWithDefault(builtinHiveVersion)
6868

sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@ import scala.collection.JavaConverters._
2424
import scala.collection.mutable
2525
import scala.collection.mutable.ArrayBuffer
2626

27-
import org.apache.hadoop.conf.Configuration
2827
import org.apache.hadoop.fs.Path
2928
import org.apache.hadoop.hive.common.StatsSetupConst
3029
import org.apache.hadoop.hive.conf.HiveConf
@@ -102,6 +101,8 @@ private[hive] class HiveClientImpl(
102101
case hive.v1_2 => new Shim_v1_2()
103102
case hive.v2_0 => new Shim_v2_0()
104103
case hive.v2_1 => new Shim_v2_1()
104+
case hive.v2_2 => new Shim_v2_2()
105+
case hive.v2_3 => new Shim_v2_3()
105106
}
106107

107108
// Create an internal session state for this HiveClientImpl.

sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -880,9 +880,7 @@ private[client] class Shim_v0_14 extends Shim_v0_13 {
880880

881881
}
882882

883-
private[client] class Shim_v1_0 extends Shim_v0_14 {
884-
885-
}
883+
private[client] class Shim_v1_0 extends Shim_v0_14
886884

887885
private[client] class Shim_v1_1 extends Shim_v1_0 {
888886

@@ -1146,3 +1144,7 @@ private[client] class Shim_v2_1 extends Shim_v2_0 {
11461144
alterPartitionsMethod.invoke(hive, tableName, newParts, environmentContextInAlterTable)
11471145
}
11481146
}
1147+
1148+
private[client] class Shim_v2_2 extends Shim_v2_1
1149+
1150+
private[client] class Shim_v2_3 extends Shim_v2_1

sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,8 @@ private[hive] object IsolatedClientLoader extends Logging {
9595
case "1.2" | "1.2.0" | "1.2.1" | "1.2.2" => hive.v1_2
9696
case "2.0" | "2.0.0" | "2.0.1" => hive.v2_0
9797
case "2.1" | "2.1.0" | "2.1.1" => hive.v2_1
98+
case "2.2" | "2.2.0" => hive.v2_2
99+
case "2.3" | "2.3.0" | "2.3.1" | "2.3.2" => hive.v2_3
98100
}
99101

100102
private def downloadVersion(

sql/hive/src/main/scala/org/apache/spark/sql/hive/client/package.scala

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,15 @@ package object client {
7171
exclusions = Seq("org.apache.curator:*",
7272
"org.pentaho:pentaho-aggdesigner-algorithm"))
7373

74-
val allSupportedHiveVersions = Set(v12, v13, v14, v1_0, v1_1, v1_2, v2_0, v2_1)
74+
case object v2_2 extends HiveVersion("2.2.0",
75+
exclusions = Seq("org.apache.curator:*",
76+
"org.pentaho:pentaho-aggdesigner-algorithm"))
77+
78+
case object v2_3 extends HiveVersion("2.3.2",
79+
exclusions = Seq("org.apache.curator:*",
80+
"org.pentaho:pentaho-aggdesigner-algorithm"))
81+
82+
val allSupportedHiveVersions = Set(v12, v13, v14, v1_0, v1_1, v1_2, v2_0, v2_1, v2_2, v2_3)
7583
}
7684
// scalastyle:on
7785

sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/SaveAsHiveFile.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -114,7 +114,8 @@ private[hive] trait SaveAsHiveFile extends DataWritingCommand {
114114
// staging directory under the table director for Hive prior to 1.1, the staging directory will
115115
// be removed by Hive when Hive is trying to empty the table directory.
116116
val hiveVersionsUsingOldExternalTempPath: Set[HiveVersion] = Set(v12, v13, v14, v1_0)
117-
val hiveVersionsUsingNewExternalTempPath: Set[HiveVersion] = Set(v1_1, v1_2, v2_0, v2_1)
117+
val hiveVersionsUsingNewExternalTempPath: Set[HiveVersion] =
118+
Set(v1_1, v1_2, v2_0, v2_1, v2_2, v2_3)
118119

119120
// Ensure all the supported versions are considered here.
120121
assert(hiveVersionsUsingNewExternalTempPath ++ hiveVersionsUsingOldExternalTempPath ==

sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HiveClientVersions.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,5 +22,6 @@ import scala.collection.immutable.IndexedSeq
2222
import org.apache.spark.SparkFunSuite
2323

2424
private[client] trait HiveClientVersions {
25-
protected val versions = IndexedSeq("0.12", "0.13", "0.14", "1.0", "1.1", "1.2", "2.0", "2.1")
25+
protected val versions =
26+
IndexedSeq("0.12", "0.13", "0.14", "1.0", "1.1", "1.2", "2.0", "2.1", "2.2", "2.3")
2627
}

sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HiveVersionSuite.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ private[client] abstract class HiveVersionSuite(version: String) extends SparkFu
3131
// Hive changed the default of datanucleus.schema.autoCreateAll from true to false and
3232
// hive.metastore.schema.verification from false to true since 2.0
3333
// For details, see the JIRA HIVE-6113 and HIVE-12463
34-
if (version == "2.0" || version == "2.1") {
34+
if (version == "2.0" || version == "2.1" || version == "2.2" || version == "2.3") {
3535
hadoopConf.set("datanucleus.schema.autoCreateAll", "true")
3636
hadoopConf.set("hive.metastore.schema.verification", "false")
3737
}

sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala

Lines changed: 48 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import java.io.{ByteArrayOutputStream, File, PrintStream, PrintWriter}
2121
import java.net.URI
2222

2323
import org.apache.hadoop.conf.Configuration
24+
import org.apache.hadoop.hive.common.StatsSetupConst
2425
import org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
2526
import org.apache.hadoop.hive.serde2.`lazy`.LazySimpleSerDe
2627
import org.apache.hadoop.mapred.TextInputFormat
@@ -108,7 +109,8 @@ class VersionsSuite extends SparkFunSuite with Logging {
108109
assert(getNestedMessages(e) contains "Unknown column 'A0.OWNER_NAME' in 'field list'")
109110
}
110111

111-
private val versions = Seq("0.12", "0.13", "0.14", "1.0", "1.1", "1.2", "2.0", "2.1")
112+
private val versions =
113+
Seq("0.12", "0.13", "0.14", "1.0", "1.1", "1.2", "2.0", "2.1", "2.2", "2.3")
112114

113115
private var client: HiveClient = null
114116

@@ -123,7 +125,7 @@ class VersionsSuite extends SparkFunSuite with Logging {
123125
// Hive changed the default of datanucleus.schema.autoCreateAll from true to false and
124126
// hive.metastore.schema.verification from false to true since 2.0
125127
// For details, see the JIRA HIVE-6113 and HIVE-12463
126-
if (version == "2.0" || version == "2.1") {
128+
if (version == "2.0" || version == "2.1" || version == "2.2" || version == "2.3") {
127129
hadoopConf.set("datanucleus.schema.autoCreateAll", "true")
128130
hadoopConf.set("hive.metastore.schema.verification", "false")
129131
}
@@ -420,15 +422,18 @@ class VersionsSuite extends SparkFunSuite with Logging {
420422

421423
test(s"$version: alterPartitions") {
422424
val spec = Map("key1" -> "1", "key2" -> "2")
425+
val parameters = Map(StatsSetupConst.TOTAL_SIZE -> "0", StatsSetupConst.NUM_FILES -> "1")
423426
val newLocation = new URI(Utils.createTempDir().toURI.toString.stripSuffix("/"))
424427
val storage = storageFormat.copy(
425428
locationUri = Some(newLocation),
426429
// needed for 0.12 alter partitions
427430
serde = Some("org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe"))
428-
val partition = CatalogTablePartition(spec, storage)
431+
val partition = CatalogTablePartition(spec, storage, parameters)
429432
client.alterPartitions("default", "src_part", Seq(partition))
430433
assert(client.getPartition("default", "src_part", spec)
431434
.storage.locationUri == Some(newLocation))
435+
assert(client.getPartition("default", "src_part", spec)
436+
.parameters.get(StatsSetupConst.TOTAL_SIZE) == Some("0"))
432437
}
433438

434439
test(s"$version: dropPartitions") {
@@ -631,6 +636,46 @@ class VersionsSuite extends SparkFunSuite with Logging {
631636
}
632637
}
633638

639+
test(s"$version: CREATE Partitioned TABLE AS SELECT") {
640+
withTable("tbl") {
641+
versionSpark.sql(
642+
"""
643+
|CREATE TABLE tbl(c1 string)
644+
|PARTITIONED BY (ds STRING)
645+
""".stripMargin)
646+
versionSpark.sql("INSERT OVERWRITE TABLE tbl partition (ds='2') SELECT '1'")
647+
648+
assert(versionSpark.table("tbl").collect().toSeq == Seq(Row("1", "2")))
649+
val partMeta = versionSpark.sessionState.catalog.getPartition(
650+
TableIdentifier("tbl"), spec = Map("ds" -> "2")).parameters
651+
val totalSize = partMeta.get(StatsSetupConst.TOTAL_SIZE).map(_.toLong)
652+
val numFiles = partMeta.get(StatsSetupConst.NUM_FILES).map(_.toLong)
653+
// Except 0.12, all the following versions will fill the Hive-generated statistics
654+
if (version == "0.12") {
655+
assert(totalSize.isEmpty && numFiles.isEmpty)
656+
} else {
657+
assert(totalSize.nonEmpty && numFiles.nonEmpty)
658+
}
659+
660+
versionSpark.sql(
661+
"""
662+
|ALTER TABLE tbl PARTITION (ds='2')
663+
|SET SERDEPROPERTIES ('newKey' = 'vvv')
664+
""".stripMargin)
665+
val newPartMeta = versionSpark.sessionState.catalog.getPartition(
666+
TableIdentifier("tbl"), spec = Map("ds" -> "2")).parameters
667+
668+
val newTotalSize = newPartMeta.get(StatsSetupConst.TOTAL_SIZE).map(_.toLong)
669+
val newNumFiles = newPartMeta.get(StatsSetupConst.NUM_FILES).map(_.toLong)
670+
// Except 0.12, all the following versions will fill the Hive-generated statistics
671+
if (version == "0.12") {
672+
assert(newTotalSize.isEmpty && newNumFiles.isEmpty)
673+
} else {
674+
assert(newTotalSize.nonEmpty && newNumFiles.nonEmpty)
675+
}
676+
}
677+
}
678+
634679
test(s"$version: Delete the temporary staging directory and files after each insert") {
635680
withTempDir { tmpDir =>
636681
withTable("tab") {

0 commit comments

Comments
 (0)