diff --git a/dev/deps/spark-deps-hadoop-3.1 b/dev/deps/spark-deps-hadoop-3.1 index 2b12c35d18e2..3f54e875f7eb 100644 --- a/dev/deps/spark-deps-hadoop-3.1 +++ b/dev/deps/spark-deps-hadoop-3.1 @@ -91,8 +91,6 @@ jackson-core-2.6.7.jar jackson-core-asl-1.9.13.jar jackson-databind-2.6.7.1.jar jackson-dataformat-yaml-2.6.7.jar -jackson-jaxrs-base-2.7.8.jar -jackson-jaxrs-json-provider-2.7.8.jar jackson-mapper-asl-1.9.13.jar jackson-module-jaxb-annotations-2.6.7.jar jackson-module-paranamer-2.7.9.jar diff --git a/pom.xml b/pom.xml index 099a08185d2a..a0a4b58ea300 100644 --- a/pom.xml +++ b/pom.xml @@ -1075,6 +1075,10 @@ com.sun.jersey.contribs * + + com.fasterxml.jackson.jaxrs + jackson-jaxrs-json-provider + diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala index a5ed9088eaa4..b298d7b2df44 100644 --- a/project/SparkBuild.scala +++ b/project/SparkBuild.scala @@ -323,7 +323,7 @@ object SparkBuild extends PomBuild { // Note ordering of these settings matter. /* Enable shared settings on all projects */ (allProjects ++ optionallyEnabledProjects ++ assemblyProjects ++ copyJarsProjects ++ Seq(spark, tools)) - .foreach(enable(sharedSettings ++ DependencyOverrides.settings ++ + .foreach(enable(sharedSettings ++ DependencyOverrides.settings ++ ExcludeDependencies.settings ++ ExcludedDependencies.settings ++ Checkstyle.settings)) /* Enable tests settings for all projects except examples, assembly and tools */ @@ -471,7 +471,20 @@ object DockerIntegrationTests { object DependencyOverrides { lazy val settings = Seq( dependencyOverrides += "com.google.guava" % "guava" % "14.0.1", - dependencyOverrides += "jline" % "jline" % "2.14.6") + dependencyOverrides += "jline" % "jline" % "2.14.6", + dependencyOverrides += "com.fasterxml.jackson.core" % "jackson-annotations" % "2.6.7", + dependencyOverrides += "com.fasterxml.jackson.core" % "jackson-core" % "2.6.7", + dependencyOverrides += "com.fasterxml.jackson.core" % "jackson-module-jaxb-annotations" % "2.6.7", + dependencyOverrides += "com.fasterxml.jackson.core" % "jackson-databind" % "2.6.7") +} + +/** + * Exclusions to work around sbt's dependency resolution being different from Maven's. + */ +object ExcludeDependencies { + lazy val settings = Seq( + excludeDependencies += "com.fasterxml.jackson.jaxrs" % "jackson-jaxrs-json-provider", + excludeDependencies += "javax.ws.rs" % "jsr311-api") } /** diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala index f8d98dead2d4..29517cab14ea 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala @@ -22,6 +22,7 @@ import java.net.URI import java.util.Locale import org.apache.hadoop.fs.Path +import org.apache.hadoop.util.VersionInfo import org.scalatest.BeforeAndAfterEach import org.apache.spark.sql.{AnalysisException, QueryTest, Row, SaveMode} @@ -2697,6 +2698,11 @@ abstract class DDLSuite extends QueryTest with SQLTestUtils { } test(s"basic DDL using locale tr - caseSensitive $caseSensitive") { + // There seems a bug about dealing with non-ascii compatible characters in file names + // for the usage of File.toPath() in few specific JDKs, which looks updated in HADOOP-12045 + // for local file systems specifically. This affects Hadoop 2.8.0+ per the JIRA. + // See https://stackoverflow.com/questions/37409379/invalidpathexception-for-chinese-filename + assume(VersionInfo.getVersion < "2.8.0") withSQLConf(SQLConf.CASE_SENSITIVE.key -> s"$caseSensitive") { withLocale("tr") { val dbName = "DaTaBaSe_I" diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala index 6a90c44a2633..d33440a65dec 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala @@ -177,7 +177,10 @@ private[hive] class IsolatedClientLoader( protected def isSharedClass(name: String): Boolean = { val isHadoopClass = - name.startsWith("org.apache.hadoop.") && !name.startsWith("org.apache.hadoop.hive.") + name.startsWith("org.apache.hadoop.") && !name.startsWith("org.apache.hadoop.hive.") || + // Also, includes configuration2 as a min fix for Hadoop 3+ for now. This is failed + // during class resolution. It is fine when 'sharesHadoopClasses' is disabled. + name.startsWith("org.apache.commons.configuration2.") name.startsWith("org.slf4j") || name.startsWith("org.apache.log4j") || // log4j1.x diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala index a676cf6ce692..162f76697d3f 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala @@ -22,6 +22,7 @@ import java.io.{BufferedWriter, File, FileWriter} import scala.util.Properties import org.apache.hadoop.fs.Path +import org.apache.hadoop.util.VersionInfo import org.scalatest.{BeforeAndAfterEach, Matchers} import org.apache.spark._ @@ -123,6 +124,7 @@ class HiveSparkSubmitSuite } test("SPARK-8020: set sql conf in spark conf") { + assume(VersionInfo.getVersion < "3.0.0", "Only Hive 2.3+ supports Hadoop 3+. See HIVE-16081.") val unusedJar = TestUtils.createJarWithClasses(Seq.empty) val args = Seq( "--class", SparkSQLConfTest.getClass.getName.stripSuffix("$"), @@ -159,6 +161,7 @@ class HiveSparkSubmitSuite } test("SPARK-9757 Persist Parquet relation with decimal column") { + assume(VersionInfo.getVersion < "3.0.0", "Only Hive 2.3+ supports Hadoop 3+. See HIVE-16081.") val unusedJar = TestUtils.createJarWithClasses(Seq.empty) val args = Seq( "--class", SPARK_9757.getClass.getName.stripSuffix("$"), @@ -250,6 +253,7 @@ class HiveSparkSubmitSuite } test("SPARK-16901: set javax.jdo.option.ConnectionURL") { + assume(VersionInfo.getVersion < "3.0.0", "Only Hive 2.3+ supports Hadoop 3+. See HIVE-16081.") // In this test, we set javax.jdo.option.ConnectionURL and set metastore version to // 0.13. This test will make sure that javax.jdo.option.ConnectionURL will not be // overridden by hive's default settings when we create a HiveConf object inside diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HiveClientSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HiveClientSuite.scala index fa9f753795f6..145aae393326 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HiveClientSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HiveClientSuite.scala @@ -74,10 +74,7 @@ class HiveClientSuite(version: String) } } - override def beforeAll() { - super.beforeAll() - client = init(true) - } + private lazy val client: HiveClient = init(true) test(s"getPartitionsByFilter returns all partitions when $tryDirectSqlKey=false") { val client = init(false) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HiveVersionSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HiveVersionSuite.scala index e5963d03f6b5..6cd7ab403a1d 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HiveVersionSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HiveVersionSuite.scala @@ -18,6 +18,7 @@ package org.apache.spark.sql.hive.client import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.util.VersionInfo import org.scalactic.source.Position import org.scalatest.Tag @@ -26,7 +27,6 @@ import org.apache.spark.sql.hive.HiveUtils private[client] abstract class HiveVersionSuite(version: String) extends SparkFunSuite { override protected val enableAutoThreadAudit = false - protected var client: HiveClient = null protected def buildClient( hadoopConf: Configuration, @@ -49,6 +49,11 @@ private[client] abstract class HiveVersionSuite(version: String) extends SparkFu override protected def test(testName: String, testTags: Tag*)(testFun: => Any) (implicit pos: Position): Unit = { - super.test(s"$version: $testName", testTags: _*)(testFun) + super.test(s"$version: $testName", testTags: _*) { + assume( + VersionInfo.getVersion < "3.0.0" || version >= "2.3", + "Hive 2.3+ supports Hadoop 3+. See HIVE-16081.") + testFun + } } } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala index dc96ec416afd..c02b1cecdc34 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala @@ -25,6 +25,7 @@ import org.apache.hadoop.hive.common.StatsSetupConst import org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat import org.apache.hadoop.hive.serde2.`lazy`.LazySimpleSerDe import org.apache.hadoop.mapred.TextInputFormat +import org.apache.hadoop.util.VersionInfo import org.apache.spark.SparkFunSuite import org.apache.spark.internal.Logging @@ -75,19 +76,29 @@ class VersionsSuite extends SparkFunSuite with Logging { } } - test("success sanity check") { + private val testBuiltinVersion = testHiveVersion(HiveUtils.builtinHiveVersion, _: String) _ + testBuiltinVersion("success sanity check") { val badClient = buildClient(HiveUtils.builtinHiveVersion, new Configuration()) val db = new CatalogDatabase("default", "desc", new URI("loc"), Map()) badClient.createDatabase(db, ignoreIfExists = true) } - test("hadoop configuration preserved") { + testBuiltinVersion("hadoop configuration preserved") { val hadoopConf = new Configuration() hadoopConf.set("test", "success") val client = buildClient(HiveUtils.builtinHiveVersion, hadoopConf) assert("success" === client.getConf("test", null)) } + private def testHiveVersion(hiveVersion: String, title: String)(func: => Unit): Unit = { + test(title) { + assume( + VersionInfo.getVersion < "3.0.0" || hiveVersion >= "2.3", + "Hive 2.3+ supports Hadoop 3+. See HIVE-16081.") + func + } + } + private def getNestedMessages(e: Throwable): String = { var causes = "" var lastException = e @@ -119,7 +130,8 @@ class VersionsSuite extends SparkFunSuite with Logging { private var versionSpark: TestHiveVersion = null versions.foreach { version => - test(s"$version: create client") { + val testVersion = testHiveVersion(version, _: String) _ + testVersion(s"$version: create client") { client = null System.gc() // Hack to avoid SEGV on some JVM versions. val hadoopConf = new Configuration() @@ -159,7 +171,7 @@ class VersionsSuite extends SparkFunSuite with Logging { val tempDatabasePath = Utils.createTempDir().toURI - test(s"$version: createDatabase") { + testVersion(s"$version: createDatabase") { val defaultDB = CatalogDatabase("default", "desc", new URI("loc"), Map()) client.createDatabase(defaultDB, ignoreIfExists = true) val tempDB = CatalogDatabase( @@ -167,7 +179,7 @@ class VersionsSuite extends SparkFunSuite with Logging { client.createDatabase(tempDB, ignoreIfExists = true) } - test(s"$version: createDatabase with null description") { + testVersion(s"$version: createDatabase with null description") { withTempDir { tmpDir => val dbWithNullDesc = CatalogDatabase("dbWithNullDesc", description = null, tmpDir.toURI, Map()) @@ -176,32 +188,32 @@ class VersionsSuite extends SparkFunSuite with Logging { } } - test(s"$version: setCurrentDatabase") { + testVersion(s"$version: setCurrentDatabase") { client.setCurrentDatabase("default") } - test(s"$version: getDatabase") { + testVersion(s"$version: getDatabase") { // No exception should be thrown client.getDatabase("default") intercept[NoSuchDatabaseException](client.getDatabase("nonexist")) } - test(s"$version: databaseExists") { + testVersion(s"$version: databaseExists") { assert(client.databaseExists("default") == true) assert(client.databaseExists("nonexist") == false) } - test(s"$version: listDatabases") { + testVersion(s"$version: listDatabases") { assert(client.listDatabases("defau.*") == Seq("default")) } - test(s"$version: alterDatabase") { + testVersion(s"$version: alterDatabase") { val database = client.getDatabase("temporary").copy(properties = Map("flag" -> "true")) client.alterDatabase(database) assert(client.getDatabase("temporary").properties.contains("flag")) } - test(s"$version: dropDatabase") { + testVersion(s"$version: dropDatabase") { assert(client.databaseExists("temporary") == true) client.dropDatabase("temporary", ignoreIfNotExists = false, cascade = true) assert(client.databaseExists("temporary") == false) @@ -211,12 +223,12 @@ class VersionsSuite extends SparkFunSuite with Logging { // Table related API /////////////////////////////////////////////////////////////////////////// - test(s"$version: createTable") { + testVersion(s"$version: createTable") { client.createTable(table("default", tableName = "src"), ignoreIfExists = false) client.createTable(table("default", "temporary"), ignoreIfExists = false) } - test(s"$version: loadTable") { + testVersion(s"$version: loadTable") { client.loadTable( emptyDir, tableName = "src", @@ -224,34 +236,34 @@ class VersionsSuite extends SparkFunSuite with Logging { isSrcLocal = false) } - test(s"$version: tableExists") { + testVersion(s"$version: tableExists") { // No exception should be thrown assert(client.tableExists("default", "src")) assert(!client.tableExists("default", "nonexistent")) } - test(s"$version: getTable") { + testVersion(s"$version: getTable") { // No exception should be thrown client.getTable("default", "src") } - test(s"$version: getTableOption") { + testVersion(s"$version: getTableOption") { assert(client.getTableOption("default", "src").isDefined) } - test(s"$version: alterTable(table: CatalogTable)") { + testVersion(s"$version: alterTable(table: CatalogTable)") { val newTable = client.getTable("default", "src").copy(properties = Map("changed" -> "")) client.alterTable(newTable) assert(client.getTable("default", "src").properties.contains("changed")) } - test(s"$version: alterTable(dbName: String, tableName: String, table: CatalogTable)") { + testVersion(s"$version: alterTable(dbName: String, tableName: String, table: CatalogTable)") { val newTable = client.getTable("default", "src").copy(properties = Map("changedAgain" -> "")) client.alterTable("default", "src", newTable) assert(client.getTable("default", "src").properties.contains("changedAgain")) } - test(s"$version: alterTable - rename") { + testVersion(s"$version: alterTable - rename") { val newTable = client.getTable("default", "src") .copy(identifier = TableIdentifier("tgt", database = Some("default"))) assert(!client.tableExists("default", "tgt")) @@ -262,7 +274,7 @@ class VersionsSuite extends SparkFunSuite with Logging { assert(!client.tableExists("default", "src")) } - test(s"$version: alterTable - change database") { + testVersion(s"$version: alterTable - change database") { val tempDB = CatalogDatabase( "temporary", description = "test create", tempDatabasePath, Map()) client.createDatabase(tempDB, ignoreIfExists = true) @@ -277,7 +289,7 @@ class VersionsSuite extends SparkFunSuite with Logging { assert(!client.tableExists("default", "tgt")) } - test(s"$version: alterTable - change database and table names") { + testVersion(s"$version: alterTable - change database and table names") { val newTable = client.getTable("temporary", "tgt") .copy(identifier = TableIdentifier("src", database = Some("default"))) assert(!client.tableExists("default", "src")) @@ -288,16 +300,16 @@ class VersionsSuite extends SparkFunSuite with Logging { assert(!client.tableExists("temporary", "tgt")) } - test(s"$version: listTables(database)") { + testVersion(s"$version: listTables(database)") { assert(client.listTables("default") === Seq("src", "temporary")) } - test(s"$version: listTables(database, pattern)") { + testVersion(s"$version: listTables(database, pattern)") { assert(client.listTables("default", pattern = "src") === Seq("src")) assert(client.listTables("default", pattern = "nonexist").isEmpty) } - test(s"$version: dropTable") { + testVersion(s"$version: dropTable") { val versionsWithoutPurge = versions.takeWhile(_ != "0.14") // First try with the purge option set. This should fail if the version is < 0.14, in which // case we check the version and try without it. @@ -326,13 +338,13 @@ class VersionsSuite extends SparkFunSuite with Logging { compressed = false, properties = Map.empty) - test(s"$version: sql create partitioned table") { + testVersion(s"$version: sql create partitioned table") { client.runSqlHive("CREATE TABLE src_part (value INT) PARTITIONED BY (key1 INT, key2 INT)") } val testPartitionCount = 2 - test(s"$version: createPartitions") { + testVersion(s"$version: createPartitions") { val partitions = (1 to testPartitionCount).map { key2 => CatalogTablePartition(Map("key1" -> "1", "key2" -> key2.toString), storageFormat) } @@ -340,17 +352,17 @@ class VersionsSuite extends SparkFunSuite with Logging { "default", "src_part", partitions, ignoreIfExists = true) } - test(s"$version: getPartitionNames(catalogTable)") { + testVersion(s"$version: getPartitionNames(catalogTable)") { val partitionNames = (1 to testPartitionCount).map(key2 => s"key1=1/key2=$key2") assert(partitionNames == client.getPartitionNames(client.getTable("default", "src_part"))) } - test(s"$version: getPartitions(catalogTable)") { + testVersion(s"$version: getPartitions(catalogTable)") { assert(testPartitionCount == client.getPartitions(client.getTable("default", "src_part")).size) } - test(s"$version: getPartitionsByFilter") { + testVersion(s"$version: getPartitionsByFilter") { // Only one partition [1, 1] for key2 == 1 val result = client.getPartitionsByFilter(client.getTable("default", "src_part"), Seq(EqualTo(AttributeReference("key2", IntegerType)(), Literal(1)))) @@ -363,28 +375,29 @@ class VersionsSuite extends SparkFunSuite with Logging { } } - test(s"$version: getPartition") { + testVersion(s"$version: getPartition") { // No exception should be thrown client.getPartition("default", "src_part", Map("key1" -> "1", "key2" -> "2")) } - test(s"$version: getPartitionOption(db: String, table: String, spec: TablePartitionSpec)") { + testVersion( + s"$version: getPartitionOption(db: String, table: String, spec: TablePartitionSpec)") { val partition = client.getPartitionOption( "default", "src_part", Map("key1" -> "1", "key2" -> "2")) assert(partition.isDefined) } - test(s"$version: getPartitionOption(table: CatalogTable, spec: TablePartitionSpec)") { + testVersion(s"$version: getPartitionOption(table: CatalogTable, spec: TablePartitionSpec)") { val partition = client.getPartitionOption( client.getTable("default", "src_part"), Map("key1" -> "1", "key2" -> "2")) assert(partition.isDefined) } - test(s"$version: getPartitions(db: String, table: String)") { + testVersion(s"$version: getPartitions(db: String, table: String)") { assert(testPartitionCount == client.getPartitions("default", "src_part", None).size) } - test(s"$version: loadPartition") { + testVersion(s"$version: loadPartition") { val partSpec = new java.util.LinkedHashMap[String, String] partSpec.put("key1", "1") partSpec.put("key2", "2") @@ -399,7 +412,7 @@ class VersionsSuite extends SparkFunSuite with Logging { isSrcLocal = false) } - test(s"$version: loadDynamicPartitions") { + testVersion(s"$version: loadDynamicPartitions") { val partSpec = new java.util.LinkedHashMap[String, String] partSpec.put("key1", "1") partSpec.put("key2", "") // Dynamic partition @@ -413,7 +426,7 @@ class VersionsSuite extends SparkFunSuite with Logging { numDP = 1) } - test(s"$version: renamePartitions") { + testVersion(s"$version: renamePartitions") { val oldSpec = Map("key1" -> "1", "key2" -> "1") val newSpec = Map("key1" -> "1", "key2" -> "3") client.renamePartitions("default", "src_part", Seq(oldSpec), Seq(newSpec)) @@ -422,7 +435,7 @@ class VersionsSuite extends SparkFunSuite with Logging { assert(client.getPartitionOption("default", "src_part", newSpec).isDefined) } - test(s"$version: alterPartitions") { + testVersion(s"$version: alterPartitions") { val spec = Map("key1" -> "1", "key2" -> "2") val parameters = Map(StatsSetupConst.TOTAL_SIZE -> "0", StatsSetupConst.NUM_FILES -> "1") val newLocation = new URI(Utils.createTempDir().toURI.toString.stripSuffix("/")) @@ -438,7 +451,7 @@ class VersionsSuite extends SparkFunSuite with Logging { .parameters.get(StatsSetupConst.TOTAL_SIZE) == Some("0")) } - test(s"$version: dropPartitions") { + testVersion(s"$version: dropPartitions") { val spec = Map("key1" -> "1", "key2" -> "3") val versionsWithoutPurge = versions.takeWhile(_ != "1.2") // Similar to dropTable; try with purge set, and if it fails, make sure we're running @@ -466,7 +479,7 @@ class VersionsSuite extends SparkFunSuite with Logging { FunctionIdentifier(name, Some("default")), className, Seq.empty[FunctionResource]) } - test(s"$version: createFunction") { + testVersion(s"$version: createFunction") { val functionClass = "org.apache.spark.MyFunc1" if (version == "0.12") { // Hive 0.12 doesn't support creating permanent functions @@ -478,7 +491,7 @@ class VersionsSuite extends SparkFunSuite with Logging { } } - test(s"$version: functionExists") { + testVersion(s"$version: functionExists") { if (version == "0.12") { // Hive 0.12 doesn't allow customized permanent functions assert(client.functionExists("default", "func1") == false) @@ -487,7 +500,7 @@ class VersionsSuite extends SparkFunSuite with Logging { } } - test(s"$version: renameFunction") { + testVersion(s"$version: renameFunction") { if (version == "0.12") { // Hive 0.12 doesn't allow customized permanent functions intercept[NoSuchPermanentFunctionException] { @@ -499,7 +512,7 @@ class VersionsSuite extends SparkFunSuite with Logging { } } - test(s"$version: alterFunction") { + testVersion(s"$version: alterFunction") { val functionClass = "org.apache.spark.MyFunc2" if (version == "0.12") { // Hive 0.12 doesn't allow customized permanent functions @@ -511,7 +524,7 @@ class VersionsSuite extends SparkFunSuite with Logging { } } - test(s"$version: getFunction") { + testVersion(s"$version: getFunction") { if (version == "0.12") { // Hive 0.12 doesn't allow customized permanent functions intercept[NoSuchPermanentFunctionException] { @@ -524,7 +537,7 @@ class VersionsSuite extends SparkFunSuite with Logging { } } - test(s"$version: getFunctionOption") { + testVersion(s"$version: getFunctionOption") { if (version == "0.12") { // Hive 0.12 doesn't allow customized permanent functions assert(client.getFunctionOption("default", "func2").isEmpty) @@ -534,7 +547,7 @@ class VersionsSuite extends SparkFunSuite with Logging { } } - test(s"$version: listFunctions") { + testVersion(s"$version: listFunctions") { if (version == "0.12") { // Hive 0.12 doesn't allow customized permanent functions assert(client.listFunctions("default", "fun.*").isEmpty) @@ -543,7 +556,7 @@ class VersionsSuite extends SparkFunSuite with Logging { } } - test(s"$version: dropFunction") { + testVersion(s"$version: dropFunction") { if (version == "0.12") { // Hive 0.12 doesn't support creating permanent functions intercept[NoSuchPermanentFunctionException] { @@ -560,11 +573,11 @@ class VersionsSuite extends SparkFunSuite with Logging { // SQL related API /////////////////////////////////////////////////////////////////////////// - test(s"$version: sql set command") { + testVersion(s"$version: sql set command") { client.runSqlHive("SET spark.sql.test.key=1") } - test(s"$version: sql create index and reset") { + testVersion(s"$version: sql create index and reset") { client.runSqlHive("CREATE TABLE indexed_table (key INT)") client.runSqlHive("CREATE INDEX index_1 ON TABLE indexed_table(key) " + "as 'COMPACT' WITH DEFERRED REBUILD") @@ -574,32 +587,32 @@ class VersionsSuite extends SparkFunSuite with Logging { // Miscellaneous API /////////////////////////////////////////////////////////////////////////// - test(s"$version: version") { + testVersion(s"$version: version") { assert(client.version.fullVersion.startsWith(version)) } - test(s"$version: getConf") { + testVersion(s"$version: getConf") { assert("success" === client.getConf("test", null)) } - test(s"$version: setOut") { + testVersion(s"$version: setOut") { client.setOut(new PrintStream(new ByteArrayOutputStream())) } - test(s"$version: setInfo") { + testVersion(s"$version: setInfo") { client.setInfo(new PrintStream(new ByteArrayOutputStream())) } - test(s"$version: setError") { + testVersion(s"$version: setError") { client.setError(new PrintStream(new ByteArrayOutputStream())) } - test(s"$version: newSession") { + testVersion(s"$version: newSession") { val newClient = client.newSession() assert(newClient != null) } - test(s"$version: withHiveState and addJar") { + testVersion(s"$version: withHiveState and addJar") { val newClassPath = "." client.addJar(newClassPath) client.withHiveState { @@ -613,7 +626,7 @@ class VersionsSuite extends SparkFunSuite with Logging { } } - test(s"$version: reset") { + testVersion(s"$version: reset") { // Clears all database, tables, functions... client.reset() assert(client.listTables("default").isEmpty) @@ -623,7 +636,7 @@ class VersionsSuite extends SparkFunSuite with Logging { // End-To-End tests /////////////////////////////////////////////////////////////////////////// - test(s"$version: CREATE TABLE AS SELECT") { + testVersion(s"$version: CREATE TABLE AS SELECT") { withTable("tbl") { versionSpark.sql("CREATE TABLE tbl AS SELECT 1 AS a") assert(versionSpark.table("tbl").collect().toSeq == Seq(Row(1))) @@ -638,7 +651,7 @@ class VersionsSuite extends SparkFunSuite with Logging { } } - test(s"$version: CREATE Partitioned TABLE AS SELECT") { + testVersion(s"$version: CREATE Partitioned TABLE AS SELECT") { withTable("tbl") { versionSpark.sql( """ @@ -678,7 +691,7 @@ class VersionsSuite extends SparkFunSuite with Logging { } } - test(s"$version: Delete the temporary staging directory and files after each insert") { + testVersion(s"$version: Delete the temporary staging directory and files after each insert") { withTempDir { tmpDir => withTable("tab") { versionSpark.sql( @@ -704,7 +717,7 @@ class VersionsSuite extends SparkFunSuite with Logging { } } - test(s"$version: SPARK-13709: reading partitioned Avro table with nested schema") { + testVersion(s"$version: SPARK-13709: reading partitioned Avro table with nested schema") { withTempDir { dir => val path = dir.toURI.toString val tableName = "spark_13709" @@ -781,7 +794,7 @@ class VersionsSuite extends SparkFunSuite with Logging { } } - test(s"$version: CTAS for managed data source tables") { + testVersion(s"$version: CTAS for managed data source tables") { withTable("t", "t1") { versionSpark.range(1).write.saveAsTable("t") assert(versionSpark.table("t").collect() === Array(Row(0))) @@ -790,7 +803,7 @@ class VersionsSuite extends SparkFunSuite with Logging { } } - test(s"$version: Decimal support of Avro Hive serde") { + testVersion(s"$version: Decimal support of Avro Hive serde") { val tableName = "tab1" // TODO: add the other logical types. For details, see the link: // https://avro.apache.org/docs/1.8.1/spec.html#Logical+Types @@ -856,7 +869,7 @@ class VersionsSuite extends SparkFunSuite with Logging { } } - test(s"$version: read avro file containing decimal") { + testVersion(s"$version: read avro file containing decimal") { val url = Thread.currentThread().getContextClassLoader.getResource("avroDecimal") val location = new File(url.getFile).toURI.toString @@ -897,7 +910,7 @@ class VersionsSuite extends SparkFunSuite with Logging { } } - test(s"$version: SPARK-17920: Insert into/overwrite avro table") { + testVersion(s"$version: SPARK-17920: Insert into/overwrite avro table") { // skipped because it's failed in the condition on Windows assume(!(Utils.isWindows && version == "0.12")) withTempDir { dir => diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala index be1aa83d682b..087b41706cfe 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala @@ -24,6 +24,7 @@ import java.util.Date import scala.language.existentials import org.apache.hadoop.fs.Path +import org.apache.hadoop.util.VersionInfo import org.apache.parquet.format.converter.ParquetMetadataConverter.NO_FILTER import org.apache.parquet.hadoop.ParquetFileReader import org.scalatest.BeforeAndAfterEach @@ -397,6 +398,11 @@ class HiveDDLSuite } test("create Hive-serde table and view with unicode columns and comment") { + // There seems a bug about dealing with non-ascii compatible characters in file names + // for the usage of File.toPath() in few specific JDKs, which looks updated in HADOOP-12045 + // for local file systems specifically. This affects Hadoop 2.8.0+ per the JIRA. + // See https://stackoverflow.com/questions/37409379/invalidpathexception-for-chinese-filename + assume(VersionInfo.getVersion < "2.8.0") val catalog = spark.sessionState.catalog val tabName = "tab1" val viewName = "view1" diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/Hive_2_1_DDLSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/Hive_2_1_DDLSuite.scala index eaedac1fa95d..2047ecb62e5f 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/Hive_2_1_DDLSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/Hive_2_1_DDLSuite.scala @@ -20,6 +20,7 @@ package org.apache.spark.sql.hive.execution import scala.language.existentials import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.util.VersionInfo import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach} import org.apache.spark.{SparkConf, SparkFunSuite} @@ -66,13 +67,6 @@ class Hive_2_1_DDLSuite extends SparkFunSuite with TestHiveSingleton with Before new HiveExternalCatalog(sparkConf, hadoopConf) } - override def afterEach: Unit = { - catalog.listTables("default").foreach { t => - catalog.dropTable("default", t, true, false) - } - spark.sessionState.catalog.reset() - } - override def afterAll(): Unit = { try { catalog = null @@ -81,7 +75,19 @@ class Hive_2_1_DDLSuite extends SparkFunSuite with TestHiveSingleton with Before } } - test("SPARK-21617: ALTER TABLE for non-compatible DataSource tables") { + private def test_2_1(title: String)(func: => Unit): Unit = test(title) { + assume(VersionInfo.getVersion < "3.0.0", "Only Hive 2.3+ supports Hadoop 3+. See HIVE-16081.") + try { + func + } finally { + catalog.listTables("default").foreach { t => + catalog.dropTable("default", t, true, false) + } + spark.sessionState.catalog.reset() + } + } + + test_2_1("SPARK-21617: ALTER TABLE for non-compatible DataSource tables") { testAlterTable( "t1", "CREATE TABLE t1 (c1 int) USING json", @@ -89,21 +95,21 @@ class Hive_2_1_DDLSuite extends SparkFunSuite with TestHiveSingleton with Before hiveCompatible = false) } - test("SPARK-21617: ALTER TABLE for Hive-compatible DataSource tables") { + test_2_1("SPARK-21617: ALTER TABLE for Hive-compatible DataSource tables") { testAlterTable( "t1", "CREATE TABLE t1 (c1 int) USING parquet", StructType(Array(StructField("c1", IntegerType), StructField("c2", IntegerType)))) } - test("SPARK-21617: ALTER TABLE for Hive tables") { + test_2_1("SPARK-21617: ALTER TABLE for Hive tables") { testAlterTable( "t1", "CREATE TABLE t1 (c1 int) STORED AS parquet", StructType(Array(StructField("c1", IntegerType), StructField("c2", IntegerType)))) } - test("SPARK-21617: ALTER TABLE with incompatible schema on Hive-compatible table") { + test_2_1("SPARK-21617: ALTER TABLE with incompatible schema on Hive-compatible table") { val exception = intercept[AnalysisException] { testAlterTable( "t1",