From 513098cf523baa7d6eb3ad7a2d7995bfdef96921 Mon Sep 17 00:00:00 2001 From: Yuming Wang Date: Thu, 7 Mar 2019 00:17:58 +0800 Subject: [PATCH 1/5] Refactor HiveClientImpl runHive --- .../sql/hive/client/HiveClientImpl.scala | 32 +++++-- .../hive/client/Hive_3_1_ClientSuite.scala | 93 +++++++++++++++++++ 2 files changed, 117 insertions(+), 8 deletions(-) create mode 100644 sql/hive/src/test/scala/org/apache/spark/sql/hive/client/Hive_3_1_ClientSuite.scala diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala index 2ca54afa31e6d..41db0548613b0 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala @@ -711,6 +711,16 @@ private[hive] class HiveClientImpl( protected def runHive(cmd: String, maxRows: Int = 1000): Seq[String] = withHiveState { logDebug(s"Running hiveql '$cmd'") if (cmd.toLowerCase(Locale.ROOT).startsWith("set")) { logDebug(s"Changing config: $cmd") } + + def runProcessor(proc: CommandProcessor, tokens: Array[String], cmd_1: String): Seq[String] = { + if (state.out != null) { + // scalastyle:off println + state.out.println(tokens(0) + " " + cmd_1) + // scalastyle:on println + } + Seq(proc.run(cmd_1).getResponseCode.toString) + } + try { val cmd_trimmed: String = cmd.trim() val tokens: Array[String] = cmd_trimmed.split("\\s+") @@ -718,7 +728,9 @@ private[hive] class HiveClientImpl( val cmd_1: String = cmd_trimmed.substring(tokens(0).length()).trim() val proc = shim.getCommandProcessor(tokens(0), conf) proc match { - case driver: Driver => + // HIVE-18238(Hive 3.0.0) changed the close() function return type. + // This change is not compatible with the built-in Hive. + case driver: Driver if version != hive.v3_1 => val response: CommandProcessorResponse = driver.run(cmd) // Throw an exception if there is an error in query processing. if (response.getResponseCode != 0) { @@ -733,13 +745,17 @@ private[hive] class HiveClientImpl( CommandProcessorFactory.clean(conf) results - case _ => - if (state.out != null) { - // scalastyle:off println - state.out.println(tokens(0) + " " + cmd_1) - // scalastyle:on println - } - Seq(proc.run(cmd_1).getResponseCode.toString) + case _: SetProcessor | _: ResetProcessor => + runProcessor(proc, tokens, cmd_1) + case _: AddResourceProcessor | _: DeleteResourceProcessor | _: ListResourceProcessor => + runProcessor(proc, tokens, cmd_1) + case _: CompileProcessor | _: CryptoProcessor | _: DfsProcessor | _: ReloadProcessor => + runProcessor(proc, tokens, cmd_1) + + case unsupportedProcessor => + val className = unsupportedProcessor.getClass.getCanonicalName + throw new AnalysisException( + s"Dose not support Hive ${version.fullVersion} processor: $className") } } catch { case e: Exception => diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/Hive_3_1_ClientSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/Hive_3_1_ClientSuite.scala new file mode 100644 index 0000000000000..45874d1c0631c --- /dev/null +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/Hive_3_1_ClientSuite.scala @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hive.client + +import scala.language.existentials + +import org.apache.hadoop.conf.Configuration + +import org.apache.spark.{SparkConf, SparkFunSuite} +import org.apache.spark.internal.config.UI.UI_ENABLED +import org.apache.spark.launcher.SparkLauncher +import org.apache.spark.sql.AnalysisException +import org.apache.spark.sql.hive.{HiveExternalCatalog, HiveUtils} +import org.apache.spark.sql.hive.test.TestHiveSingleton +import org.apache.spark.sql.internal.StaticSQLConf._ +import org.apache.spark.tags.ExtendedHiveTest +import org.apache.spark.util.Utils + +/** + * A separate test that uses Hive 3.1 libraries, which behave a little differently + * from the built-in ones. + */ +@ExtendedHiveTest +class Hive_3_1_ClientSuite extends SparkFunSuite with TestHiveSingleton { + + private var catalog = { + val warehouse = Utils.createTempDir() + val metastore = Utils.createTempDir() + metastore.delete() + val sparkConf = new SparkConf() + .set(SparkLauncher.SPARK_MASTER, "local") + .set(UI_ENABLED, false) + .set(WAREHOUSE_PATH.key, warehouse.toURI().toString()) + .set(CATALOG_IMPLEMENTATION.key, "hive") + .set(HiveUtils.HIVE_METASTORE_VERSION.key, "3.1") + .set(HiveUtils.HIVE_METASTORE_JARS.key, "maven") + + val hadoopConf = new Configuration() + hadoopConf.set("hive.metastore.warehouse.dir", warehouse.toURI().toString()) + hadoopConf.set("javax.jdo.option.ConnectionURL", + s"jdbc:derby:;databaseName=${metastore.getAbsolutePath()};create=true") + // These options are needed since the defaults in Hive 3.1 cause exceptions with an + // empty metastore db. + hadoopConf.set("datanucleus.schema.autoCreateAll", "true") + hadoopConf.set("hive.metastore.schema.verification", "false") + hadoopConf.set("hive.in.test", "true") + + new HiveExternalCatalog(sparkConf, hadoopConf) + } + + override def afterAll(): Unit = { + try { + catalog = null + } finally { + super.afterAll() + } + } + + test("Hive 3.1.1 does not fully support runSqlHive") { + // HIVE-17626(Hive 3.0.0) add ReExecDriver + val e1 = intercept[AnalysisException] { + catalog.client.runSqlHive("create table t1(c1 int)") + }.getMessage + assert(e1.contains( + "Dose not support Hive 3.1.1 processor: org.apache.hadoop.hive.ql.reexec.ReExecDriver")) + + catalog.client.runSqlHive("set hive.query.reexecution.enabled=false") + + // HIVE-18238(Hive 3.0.0) changed the close() function return type. + // This change is not compatible with the built-in Hive. + val e2 = intercept[AnalysisException] { + catalog.client.runSqlHive("create table t2(c1 int)") + }.getMessage + assert(e2.contains( + "Dose not support Hive 3.1.1 processor: org.apache.hadoop.hive.ql.Driver")) + } + +} From c266a8e92d4edab55199ee9ff5638837adb88703 Mon Sep 17 00:00:00 2001 From: Yuming Wang Date: Sun, 12 May 2019 23:22:57 +0800 Subject: [PATCH 2/5] set hive.query.reexecution.enabled=false --- .../sql/hive/client/HiveClientImpl.scala | 71 +++++++------- .../hive/client/Hive_3_1_ClientSuite.scala | 93 ------------------- .../spark/sql/hive/client/VersionsSuite.scala | 11 ++- 3 files changed, 44 insertions(+), 131 deletions(-) delete mode 100644 sql/hive/src/test/scala/org/apache/spark/sql/hive/client/Hive_3_1_ClientSuite.scala diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala index 7c6dee16a3651..4f4bb7846d26c 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala @@ -712,20 +712,20 @@ private[hive] class HiveClientImpl( * in the sequence is one row. * Since upgrading the built-in Hive to 2.3, hive-llap-client is needed when * running MapReduce jobs with `runHive`. + * Since HIVE-17626(Hive 3.0.0), need to set hive.query.reexecution.enabled=false. */ protected def runHive(cmd: String, maxRows: Int = 1000): Seq[String] = withHiveState { - logDebug(s"Running hiveql '$cmd'") - if (cmd.toLowerCase(Locale.ROOT).startsWith("set")) { logDebug(s"Changing config: $cmd") } - - def runProcessor(proc: CommandProcessor, tokens: Array[String], cmd_1: String): Seq[String] = { - if (state.out != null) { - // scalastyle:off println - state.out.println(tokens(0) + " " + cmd_1) - // scalastyle:on println + def closeDriver(driver: Driver): Unit = { + // Since HIVE-18238(Hive 3.0.0), the Driver.close function's return type changed + // and the CommandProcessorFactory.clean function removed. + driver.getClass.getMethod("close").invoke(driver) + if (version != hive.v3_1) { + CommandProcessorFactory.clean(conf) } - Seq(proc.run(cmd_1).getResponseCode.toString) } + logDebug(s"Running hiveql '$cmd'") + if (cmd.toLowerCase(Locale.ROOT).startsWith("set")) { logDebug(s"Changing config: $cmd") } try { val cmd_trimmed: String = cmd.trim() val tokens: Array[String] = cmd_trimmed.split("\\s+") @@ -733,34 +733,26 @@ private[hive] class HiveClientImpl( val cmd_1: String = cmd_trimmed.substring(tokens(0).length()).trim() val proc = shim.getCommandProcessor(tokens(0), conf) proc match { - // HIVE-18238(Hive 3.0.0) changed the close() function return type. - // This change is not compatible with the built-in Hive. - case driver: Driver if version != hive.v3_1 => + case driver: Driver => val response: CommandProcessorResponse = driver.run(cmd) // Throw an exception if there is an error in query processing. if (response.getResponseCode != 0) { - driver.close() - CommandProcessorFactory.clean(conf) + closeDriver(driver) throw new QueryExecutionException(response.getErrorMessage) } driver.setMaxRows(maxRows) val results = shim.getDriverResults(driver) - driver.close() - CommandProcessorFactory.clean(conf) + closeDriver(driver) results - case _: SetProcessor | _: ResetProcessor => - runProcessor(proc, tokens, cmd_1) - case _: AddResourceProcessor | _: DeleteResourceProcessor | _: ListResourceProcessor => - runProcessor(proc, tokens, cmd_1) - case _: CompileProcessor | _: CryptoProcessor | _: DfsProcessor | _: ReloadProcessor => - runProcessor(proc, tokens, cmd_1) - - case unsupportedProcessor => - val className = unsupportedProcessor.getClass.getCanonicalName - throw new AnalysisException( - s"Dose not support Hive ${version.fullVersion} processor: $className") + case _ => + if (state.out != null) { + // scalastyle:off println + state.out.println(tokens(0) + " " + cmd_1) + // scalastyle:on println + } + Seq(proc.run(cmd_1).getResponseCode.toString) } } catch { case e: Exception => @@ -872,20 +864,31 @@ private[hive] class HiveClientImpl( } def reset(): Unit = withHiveState { - client.getAllTables("default").asScala.foreach { t => - logDebug(s"Deleting table $t") - val table = client.getTable("default", t) + val allTables = client.getAllTables("default") + val (mvs, others) = allTables.asScala.map(t => client.getTable("default", t)) + .partition(_.getTableType.toString.equals("MATERIALIZED_VIEW")) + + // Remove materialized view first, otherwise caused a violation of foreign key constraint. + mvs.foreach { table => + val tableName = table.getTableName + logDebug(s"Deleting materialized view $tableName") + client.dropTable("default", table.getTableName) + } + + others.foreach { table => + val tableName = table.getTableName + logDebug(s"Deleting table $tableName") try { - client.getIndexes("default", t, 255).asScala.foreach { index => - shim.dropIndex(client, "default", t, index.getIndexName) + client.getIndexes("default", tableName, 255).asScala.foreach { index => + shim.dropIndex(client, "default", tableName, index.getIndexName) } if (!table.isIndexTable) { - client.dropTable("default", t) + client.dropTable("default", tableName) } } catch { case _: NoSuchMethodError => // HIVE-18448 Hive 3.0 remove index APIs - client.dropTable("default", t) + client.dropTable("default", tableName) } } client.getAllDatabases.asScala.filterNot(_ == "default").foreach { db => diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/Hive_3_1_ClientSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/Hive_3_1_ClientSuite.scala deleted file mode 100644 index 45874d1c0631c..0000000000000 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/Hive_3_1_ClientSuite.scala +++ /dev/null @@ -1,93 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.hive.client - -import scala.language.existentials - -import org.apache.hadoop.conf.Configuration - -import org.apache.spark.{SparkConf, SparkFunSuite} -import org.apache.spark.internal.config.UI.UI_ENABLED -import org.apache.spark.launcher.SparkLauncher -import org.apache.spark.sql.AnalysisException -import org.apache.spark.sql.hive.{HiveExternalCatalog, HiveUtils} -import org.apache.spark.sql.hive.test.TestHiveSingleton -import org.apache.spark.sql.internal.StaticSQLConf._ -import org.apache.spark.tags.ExtendedHiveTest -import org.apache.spark.util.Utils - -/** - * A separate test that uses Hive 3.1 libraries, which behave a little differently - * from the built-in ones. - */ -@ExtendedHiveTest -class Hive_3_1_ClientSuite extends SparkFunSuite with TestHiveSingleton { - - private var catalog = { - val warehouse = Utils.createTempDir() - val metastore = Utils.createTempDir() - metastore.delete() - val sparkConf = new SparkConf() - .set(SparkLauncher.SPARK_MASTER, "local") - .set(UI_ENABLED, false) - .set(WAREHOUSE_PATH.key, warehouse.toURI().toString()) - .set(CATALOG_IMPLEMENTATION.key, "hive") - .set(HiveUtils.HIVE_METASTORE_VERSION.key, "3.1") - .set(HiveUtils.HIVE_METASTORE_JARS.key, "maven") - - val hadoopConf = new Configuration() - hadoopConf.set("hive.metastore.warehouse.dir", warehouse.toURI().toString()) - hadoopConf.set("javax.jdo.option.ConnectionURL", - s"jdbc:derby:;databaseName=${metastore.getAbsolutePath()};create=true") - // These options are needed since the defaults in Hive 3.1 cause exceptions with an - // empty metastore db. - hadoopConf.set("datanucleus.schema.autoCreateAll", "true") - hadoopConf.set("hive.metastore.schema.verification", "false") - hadoopConf.set("hive.in.test", "true") - - new HiveExternalCatalog(sparkConf, hadoopConf) - } - - override def afterAll(): Unit = { - try { - catalog = null - } finally { - super.afterAll() - } - } - - test("Hive 3.1.1 does not fully support runSqlHive") { - // HIVE-17626(Hive 3.0.0) add ReExecDriver - val e1 = intercept[AnalysisException] { - catalog.client.runSqlHive("create table t1(c1 int)") - }.getMessage - assert(e1.contains( - "Dose not support Hive 3.1.1 processor: org.apache.hadoop.hive.ql.reexec.ReExecDriver")) - - catalog.client.runSqlHive("set hive.query.reexecution.enabled=false") - - // HIVE-18238(Hive 3.0.0) changed the close() function return type. - // This change is not compatible with the built-in Hive. - val e2 = intercept[AnalysisException] { - catalog.client.runSqlHive("create table t2(c1 int)") - }.getMessage - assert(e2.contains( - "Dose not support Hive 3.1.1 processor: org.apache.hadoop.hive.ql.Driver")) - } - -} diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala index 8f2365c436696..23197780dbeb3 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala @@ -123,9 +123,11 @@ class VersionsSuite extends SparkFunSuite with Logging { hadoopConf.set("datanucleus.schema.autoCreateAll", "true") hadoopConf.set("hive.metastore.schema.verification", "false") } - // Since Hive 3.0, HIVE-19310 skipped `ensureDbInit` if `hive.in.test=false`. if (version == "3.1") { + // Since Hive 3.0, HIVE-19310 skipped `ensureDbInit` if `hive.in.test=false`. hadoopConf.set("hive.in.test", "true") + // Since HIVE-17626(Hive 3.0.0), need to set hive.query.reexecution.enabled=false. + hadoopConf.set("hive.query.reexecution.enabled", "false") } client = buildClient(version, hadoopConf, HiveUtils.formatTimeVarsForHiveClient(hadoopConf)) if (versionSpark != null) versionSpark.reset() @@ -584,10 +586,11 @@ class VersionsSuite extends SparkFunSuite with Logging { test(s"$version: sql read hive materialized view") { // HIVE-14249 Since Hive 2.3.0, materialized view is supported. - // But skip Hive 3.1 because of SPARK-27074. - if (version == "2.3") { + if (version == "2.3" || version == "3.1") { + val disableRewrite = if (version == "2.3") "" else "DISABLE REWRITE" client.runSqlHive("CREATE TABLE materialized_view_tbl (c1 INT)") - client.runSqlHive("CREATE MATERIALIZED VIEW mv1 AS SELECT * FROM materialized_view_tbl") + client.runSqlHive( + s"CREATE MATERIALIZED VIEW mv1 $disableRewrite AS SELECT * FROM materialized_view_tbl") val e = intercept[AnalysisException](versionSpark.table("mv1").collect()).getMessage assert(e.contains("Hive materialized view is not supported")) } From 53ed19d7e2f40fd866a07547bec1f091ef295769 Mon Sep 17 00:00:00 2001 From: Yuming Wang Date: Tue, 14 May 2019 00:20:57 +0800 Subject: [PATCH 3/5] Remove test Hive 3.1 materialized view --- .../sql/hive/client/HiveClientImpl.scala | 25 ++++++------------- .../spark/sql/hive/client/VersionsSuite.scala | 11 +++----- 2 files changed, 11 insertions(+), 25 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala index 4f4bb7846d26c..91d2da6b850bd 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala @@ -864,31 +864,20 @@ private[hive] class HiveClientImpl( } def reset(): Unit = withHiveState { - val allTables = client.getAllTables("default") - val (mvs, others) = allTables.asScala.map(t => client.getTable("default", t)) - .partition(_.getTableType.toString.equals("MATERIALIZED_VIEW")) - - // Remove materialized view first, otherwise caused a violation of foreign key constraint. - mvs.foreach { table => - val tableName = table.getTableName - logDebug(s"Deleting materialized view $tableName") - client.dropTable("default", table.getTableName) - } - - others.foreach { table => - val tableName = table.getTableName - logDebug(s"Deleting table $tableName") + client.getAllTables("default").asScala.foreach { t => + logDebug(s"Deleting table $t") + val table = client.getTable("default", t) try { - client.getIndexes("default", tableName, 255).asScala.foreach { index => - shim.dropIndex(client, "default", tableName, index.getIndexName) + client.getIndexes("default", t, 255).asScala.foreach { index => + shim.dropIndex(client, "default", t, index.getIndexName) } if (!table.isIndexTable) { - client.dropTable("default", tableName) + client.dropTable("default", t) } } catch { case _: NoSuchMethodError => // HIVE-18448 Hive 3.0 remove index APIs - client.dropTable("default", tableName) + client.dropTable("default", t) } } client.getAllDatabases.asScala.filterNot(_ == "default").foreach { db => diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala index 23197780dbeb3..8f2365c436696 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala @@ -123,11 +123,9 @@ class VersionsSuite extends SparkFunSuite with Logging { hadoopConf.set("datanucleus.schema.autoCreateAll", "true") hadoopConf.set("hive.metastore.schema.verification", "false") } + // Since Hive 3.0, HIVE-19310 skipped `ensureDbInit` if `hive.in.test=false`. if (version == "3.1") { - // Since Hive 3.0, HIVE-19310 skipped `ensureDbInit` if `hive.in.test=false`. hadoopConf.set("hive.in.test", "true") - // Since HIVE-17626(Hive 3.0.0), need to set hive.query.reexecution.enabled=false. - hadoopConf.set("hive.query.reexecution.enabled", "false") } client = buildClient(version, hadoopConf, HiveUtils.formatTimeVarsForHiveClient(hadoopConf)) if (versionSpark != null) versionSpark.reset() @@ -586,11 +584,10 @@ class VersionsSuite extends SparkFunSuite with Logging { test(s"$version: sql read hive materialized view") { // HIVE-14249 Since Hive 2.3.0, materialized view is supported. - if (version == "2.3" || version == "3.1") { - val disableRewrite = if (version == "2.3") "" else "DISABLE REWRITE" + // But skip Hive 3.1 because of SPARK-27074. + if (version == "2.3") { client.runSqlHive("CREATE TABLE materialized_view_tbl (c1 INT)") - client.runSqlHive( - s"CREATE MATERIALIZED VIEW mv1 $disableRewrite AS SELECT * FROM materialized_view_tbl") + client.runSqlHive("CREATE MATERIALIZED VIEW mv1 AS SELECT * FROM materialized_view_tbl") val e = intercept[AnalysisException](versionSpark.table("mv1").collect()).getMessage assert(e.contains("Hive materialized view is not supported")) } From 38cd02832931d5f9182536f29c853706d2258818 Mon Sep 17 00:00:00 2001 From: Yuming Wang Date: Wed, 15 May 2019 00:36:01 +0800 Subject: [PATCH 4/5] Test Hive 3.1 runHive --- .../apache/spark/sql/hive/client/VersionsSuite.scala | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala index 8f2365c436696..23197780dbeb3 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala @@ -123,9 +123,11 @@ class VersionsSuite extends SparkFunSuite with Logging { hadoopConf.set("datanucleus.schema.autoCreateAll", "true") hadoopConf.set("hive.metastore.schema.verification", "false") } - // Since Hive 3.0, HIVE-19310 skipped `ensureDbInit` if `hive.in.test=false`. if (version == "3.1") { + // Since Hive 3.0, HIVE-19310 skipped `ensureDbInit` if `hive.in.test=false`. hadoopConf.set("hive.in.test", "true") + // Since HIVE-17626(Hive 3.0.0), need to set hive.query.reexecution.enabled=false. + hadoopConf.set("hive.query.reexecution.enabled", "false") } client = buildClient(version, hadoopConf, HiveUtils.formatTimeVarsForHiveClient(hadoopConf)) if (versionSpark != null) versionSpark.reset() @@ -584,10 +586,11 @@ class VersionsSuite extends SparkFunSuite with Logging { test(s"$version: sql read hive materialized view") { // HIVE-14249 Since Hive 2.3.0, materialized view is supported. - // But skip Hive 3.1 because of SPARK-27074. - if (version == "2.3") { + if (version == "2.3" || version == "3.1") { + val disableRewrite = if (version == "2.3") "" else "DISABLE REWRITE" client.runSqlHive("CREATE TABLE materialized_view_tbl (c1 INT)") - client.runSqlHive("CREATE MATERIALIZED VIEW mv1 AS SELECT * FROM materialized_view_tbl") + client.runSqlHive( + s"CREATE MATERIALIZED VIEW mv1 $disableRewrite AS SELECT * FROM materialized_view_tbl") val e = intercept[AnalysisException](versionSpark.table("mv1").collect()).getMessage assert(e.contains("Hive materialized view is not supported")) } From 7cf2875aa71094bf2db6f7d24991533a91c6ad29 Mon Sep 17 00:00:00 2001 From: Yuming Wang Date: Thu, 16 May 2019 21:26:00 +0800 Subject: [PATCH 5/5] Add comment --- .../scala/org/apache/spark/sql/hive/client/VersionsSuite.scala | 2 ++ 1 file changed, 2 insertions(+) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala index 23197780dbeb3..328457948cb41 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala @@ -587,6 +587,8 @@ class VersionsSuite extends SparkFunSuite with Logging { test(s"$version: sql read hive materialized view") { // HIVE-14249 Since Hive 2.3.0, materialized view is supported. if (version == "2.3" || version == "3.1") { + // Since HIVE-14498(Hive 3.0), Automatic rewriting for materialized view cannot be enabled + // if the materialized view uses non-transactional tables. val disableRewrite = if (version == "2.3") "" else "DISABLE REWRITE" client.runSqlHive("CREATE TABLE materialized_view_tbl (c1 INT)") client.runSqlHive(