Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ private[spark] object HiveUtils extends Logging {

val HIVE_METASTORE_VERSION = buildConf("spark.sql.hive.metastore.version")
.doc("Version of the Hive metastore. Available options are " +
s"<code>0.12.0</code> through <code>2.1.1</code>.")
s"<code>0.12.0</code> through <code>2.3.2</code>.")
.stringConf
.createWithDefault(builtinHiveVersion)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import scala.collection.JavaConverters._
import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.hadoop.hive.common.StatsSetupConst
import org.apache.hadoop.hive.conf.HiveConf
Expand Down Expand Up @@ -104,6 +103,8 @@ private[hive] class HiveClientImpl(
case hive.v1_2 => new Shim_v1_2()
case hive.v2_0 => new Shim_v2_0()
case hive.v2_1 => new Shim_v2_1()
case hive.v2_2 => new Shim_v2_2()
case hive.v2_3 => new Shim_v2_3()
}

// Create an internal session state for this HiveClientImpl.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -880,9 +880,7 @@ private[client] class Shim_v0_14 extends Shim_v0_13 {

}

private[client] class Shim_v1_0 extends Shim_v0_14 {

}
private[client] class Shim_v1_0 extends Shim_v0_14

private[client] class Shim_v1_1 extends Shim_v1_0 {

Expand Down Expand Up @@ -1146,3 +1144,7 @@ private[client] class Shim_v2_1 extends Shim_v2_0 {
alterPartitionsMethod.invoke(hive, tableName, newParts, environmentContextInAlterTable)
}
}

private[client] class Shim_v2_2 extends Shim_v2_1

private[client] class Shim_v2_3 extends Shim_v2_1
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,8 @@ private[hive] object IsolatedClientLoader extends Logging {
case "1.2" | "1.2.0" | "1.2.1" | "1.2.2" => hive.v1_2
case "2.0" | "2.0.0" | "2.0.1" => hive.v2_0
case "2.1" | "2.1.0" | "2.1.1" => hive.v2_1
case "2.2" | "2.2.0" => hive.v2_2
case "2.3" | "2.3.0" | "2.3.1" | "2.3.2" => hive.v2_3
}

private def downloadVersion(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,15 @@ package object client {
exclusions = Seq("org.apache.curator:*",
"org.pentaho:pentaho-aggdesigner-algorithm"))

val allSupportedHiveVersions = Set(v12, v13, v14, v1_0, v1_1, v1_2, v2_0, v2_1)
case object v2_2 extends HiveVersion("2.2.0",
exclusions = Seq("org.apache.curator:*",
"org.pentaho:pentaho-aggdesigner-algorithm"))

case object v2_3 extends HiveVersion("2.3.2",
exclusions = Seq("org.apache.curator:*",
"org.pentaho:pentaho-aggdesigner-algorithm"))

val allSupportedHiveVersions = Set(v12, v13, v14, v1_0, v1_1, v1_2, v2_0, v2_1, v2_2, v2_3)
}
// scalastyle:on

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,8 @@ private[hive] trait SaveAsHiveFile extends DataWritingCommand {
// staging directory under the table director for Hive prior to 1.1, the staging directory will
// be removed by Hive when Hive is trying to empty the table directory.
val hiveVersionsUsingOldExternalTempPath: Set[HiveVersion] = Set(v12, v13, v14, v1_0)
val hiveVersionsUsingNewExternalTempPath: Set[HiveVersion] = Set(v1_1, v1_2, v2_0, v2_1)
val hiveVersionsUsingNewExternalTempPath: Set[HiveVersion] =
Set(v1_1, v1_2, v2_0, v2_1, v2_2, v2_3)

// Ensure all the supported versions are considered here.
assert(hiveVersionsUsingNewExternalTempPath ++ hiveVersionsUsingOldExternalTempPath ==
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,5 +22,6 @@ import scala.collection.immutable.IndexedSeq
import org.apache.spark.SparkFunSuite

private[client] trait HiveClientVersions {
protected val versions = IndexedSeq("0.12", "0.13", "0.14", "1.0", "1.1", "1.2", "2.0", "2.1")
protected val versions =
IndexedSeq("0.12", "0.13", "0.14", "1.0", "1.1", "1.2", "2.0", "2.1", "2.2", "2.3")
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ private[client] abstract class HiveVersionSuite(version: String) extends SparkFu
// Hive changed the default of datanucleus.schema.autoCreateAll from true to false and
// hive.metastore.schema.verification from false to true since 2.0
// For details, see the JIRA HIVE-6113 and HIVE-12463
if (version == "2.0" || version == "2.1") {
if (version == "2.0" || version == "2.1" || version == "2.2" || version == "2.3") {
hadoopConf.set("datanucleus.schema.autoCreateAll", "true")
hadoopConf.set("hive.metastore.schema.verification", "false")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import java.io.{ByteArrayOutputStream, File, PrintStream, PrintWriter}
import java.net.URI

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.hive.common.StatsSetupConst
import org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
import org.apache.hadoop.hive.serde2.`lazy`.LazySimpleSerDe
import org.apache.hadoop.mapred.TextInputFormat
Expand Down Expand Up @@ -110,7 +111,8 @@ class VersionsSuite extends SparkFunSuite with Logging {
assert(getNestedMessages(e) contains "Unknown column 'A0.OWNER_NAME' in 'field list'")
}

private val versions = Seq("0.12", "0.13", "0.14", "1.0", "1.1", "1.2", "2.0", "2.1")
private val versions =
Seq("0.12", "0.13", "0.14", "1.0", "1.1", "1.2", "2.0", "2.1", "2.2", "2.3")

private var client: HiveClient = null

Expand All @@ -125,7 +127,7 @@ class VersionsSuite extends SparkFunSuite with Logging {
// Hive changed the default of datanucleus.schema.autoCreateAll from true to false and
// hive.metastore.schema.verification from false to true since 2.0
// For details, see the JIRA HIVE-6113 and HIVE-12463
if (version == "2.0" || version == "2.1") {
if (version == "2.0" || version == "2.1" || version == "2.2" || version == "2.3") {
hadoopConf.set("datanucleus.schema.autoCreateAll", "true")
hadoopConf.set("hive.metastore.schema.verification", "false")
}
Expand Down Expand Up @@ -422,15 +424,18 @@ class VersionsSuite extends SparkFunSuite with Logging {

test(s"$version: alterPartitions") {
val spec = Map("key1" -> "1", "key2" -> "2")
val parameters = Map(StatsSetupConst.TOTAL_SIZE -> "0", StatsSetupConst.NUM_FILES -> "1")
val newLocation = new URI(Utils.createTempDir().toURI.toString.stripSuffix("/"))
val storage = storageFormat.copy(
locationUri = Some(newLocation),
// needed for 0.12 alter partitions
serde = Some("org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe"))
val partition = CatalogTablePartition(spec, storage)
val partition = CatalogTablePartition(spec, storage, parameters)
client.alterPartitions("default", "src_part", Seq(partition))
assert(client.getPartition("default", "src_part", spec)
.storage.locationUri == Some(newLocation))
assert(client.getPartition("default", "src_part", spec)
.parameters.get(StatsSetupConst.TOTAL_SIZE) == Some("0"))
}

test(s"$version: dropPartitions") {
Expand Down Expand Up @@ -633,6 +638,46 @@ class VersionsSuite extends SparkFunSuite with Logging {
}
}

test(s"$version: CREATE Partitioned TABLE AS SELECT") {
withTable("tbl") {
versionSpark.sql(
"""
|CREATE TABLE tbl(c1 string)
|PARTITIONED BY (ds STRING)
""".stripMargin)
versionSpark.sql("INSERT OVERWRITE TABLE tbl partition (ds='2') SELECT '1'")

assert(versionSpark.table("tbl").collect().toSeq == Seq(Row("1", "2")))
val partMeta = versionSpark.sessionState.catalog.getPartition(
TableIdentifier("tbl"), spec = Map("ds" -> "2")).parameters
val totalSize = partMeta.get(StatsSetupConst.TOTAL_SIZE).map(_.toLong)
val numFiles = partMeta.get(StatsSetupConst.NUM_FILES).map(_.toLong)
// Except 0.12, all the following versions will fill the Hive-generated statistics
if (version == "0.12") {
assert(totalSize.isEmpty && numFiles.isEmpty)
} else {
assert(totalSize.nonEmpty && numFiles.nonEmpty)
}

versionSpark.sql(
"""
|ALTER TABLE tbl PARTITION (ds='2')
|SET SERDEPROPERTIES ('newKey' = 'vvv')
""".stripMargin)
val newPartMeta = versionSpark.sessionState.catalog.getPartition(
TableIdentifier("tbl"), spec = Map("ds" -> "2")).parameters

val newTotalSize = newPartMeta.get(StatsSetupConst.TOTAL_SIZE).map(_.toLong)
val newNumFiles = newPartMeta.get(StatsSetupConst.NUM_FILES).map(_.toLong)
// Except 0.12, all the following versions will fill the Hive-generated statistics
if (version == "0.12") {
assert(newTotalSize.isEmpty && newNumFiles.isEmpty)
} else {
assert(newTotalSize.nonEmpty && newNumFiles.nonEmpty)
}
}
}

test(s"$version: Delete the temporary staging directory and files after each insert") {
withTempDir { tmpDir =>
withTable("tab") {
Expand Down