From 96c79e79b075ea1a3dd7c4761747cb2983e8d2ad Mon Sep 17 00:00:00 2001 From: Yuming Wang Date: Mon, 8 Jul 2019 15:38:34 +0800 Subject: [PATCH 1/3] Implement Spark's own GetTableTypesOperation --- .../sql/catalyst/catalog/interface.scala | 2 + .../SparkGetTableTypesOperation.scala | 84 +++++++++++++++++++ .../server/SparkSQLOperationManager.scala | 15 +++- .../SparkMetadataOperationSuite.scala | 16 ++++ .../cli/operation/GetTableTypesOperation.java | 2 +- .../cli/operation/GetTableTypesOperation.java | 2 +- 6 files changed, 117 insertions(+), 4 deletions(-) create mode 100644 sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetTableTypesOperation.scala diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala index 2d646721f87a..c6c1d3bfa634 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala @@ -561,6 +561,8 @@ object CatalogTableType { val EXTERNAL = new CatalogTableType("EXTERNAL") val MANAGED = new CatalogTableType("MANAGED") val VIEW = new CatalogTableType("VIEW") + + val tableTypes = Seq(EXTERNAL, MANAGED, VIEW) } diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetTableTypesOperation.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetTableTypesOperation.scala new file mode 100644 index 000000000000..959d93dc4dc1 --- /dev/null +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetTableTypesOperation.scala @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hive.thriftserver + +import java.util.UUID + +import org.apache.hadoop.hive.ql.security.authorization.plugin.HiveOperationType +import org.apache.hive.service.cli._ +import org.apache.hive.service.cli.operation.GetTableTypesOperation +import org.apache.hive.service.cli.session.HiveSession + +import org.apache.spark.internal.Logging +import org.apache.spark.sql.SQLContext +import org.apache.spark.sql.catalyst.catalog.CatalogTableType +import org.apache.spark.sql.catalyst.catalog.CatalogTableType._ +import org.apache.spark.sql.hive.thriftserver.HiveThriftServer2.listener +import org.apache.spark.util.{Utils => SparkUtils} + +/** + * Spark's own GetTableTypesOperation + * + * @param sqlContext SQLContext to use + * @param parentSession a HiveSession from SessionManager + */ +private[hive] class SparkGetTableTypesOperation( + sqlContext: SQLContext, + parentSession: HiveSession) + extends GetTableTypesOperation(parentSession) with Logging { + + override def runInternal(): Unit = { + val statementId = UUID.randomUUID().toString + val logMsg = s"Listing table types" + logInfo(s"$logMsg with $statementId") + setState(OperationState.RUNNING) + // Always use the latest class loader provided by executionHive's state. + val executionHiveClassLoader = sqlContext.sharedState.jarClassLoader + Thread.currentThread().setContextClassLoader(executionHiveClassLoader) + + if (isAuthV2Enabled) { + authorizeMetaGets(HiveOperationType.GET_TABLETYPES, null) + } + + listener.onStatementStart( + statementId, + parentSession.getSessionHandle.getSessionId.toString, + logMsg, + statementId, + parentSession.getUsername) + + try { + CatalogTableType.tableTypes.foreach { tableType => + if (tableType == EXTERNAL || tableType == EXTERNAL) { + rowSet.addRow(Array[AnyRef]("TABLE")) + } else if (tableType == VIEW) { + rowSet.addRow(Array[AnyRef](tableType.name)) + } else { + logError(s"Unknown table type: ${tableType.name}") + } + } + setState(OperationState.FINISHED) + } catch { + case e: HiveSQLException => + setState(OperationState.ERROR) + listener.onStatementError(statementId, e.getMessage, SparkUtils.exceptionString(e)) + throw e + } + listener.onStatementFinish(statementId) + } +} diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/server/SparkSQLOperationManager.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/server/SparkSQLOperationManager.scala index 44b0908502e9..45d3d4fa89a2 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/server/SparkSQLOperationManager.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/server/SparkSQLOperationManager.scala @@ -21,13 +21,13 @@ import java.util.{List => JList, Map => JMap} import java.util.concurrent.ConcurrentHashMap import org.apache.hive.service.cli._ -import org.apache.hive.service.cli.operation.{ExecuteStatementOperation, GetColumnsOperation, GetSchemasOperation, MetadataOperation, Operation, OperationManager} +import org.apache.hive.service.cli.operation._ import org.apache.hive.service.cli.session.HiveSession import org.apache.spark.internal.Logging import org.apache.spark.sql.SQLContext import org.apache.spark.sql.hive.HiveUtils -import org.apache.spark.sql.hive.thriftserver.{ReflectionUtils, SparkExecuteStatementOperation, SparkGetColumnsOperation, SparkGetSchemasOperation, SparkGetTablesOperation} +import org.apache.spark.sql.hive.thriftserver._ import org.apache.spark.sql.internal.SQLConf /** @@ -108,6 +108,17 @@ private[thriftserver] class SparkSQLOperationManager() operation } + override def newGetTableTypesOperation( + parentSession: HiveSession): GetTableTypesOperation = synchronized { + val sqlContext = sessionToContexts.get(parentSession.getSessionHandle) + require(sqlContext != null, s"Session handle: ${parentSession.getSessionHandle} has not been" + + s" initialized or had already closed.") + val operation = new SparkGetTableTypesOperation(sqlContext, parentSession) + handleToOperation.put(operation.getHandle, operation) + logDebug(s"Created GetTableTypesOperation with session=$parentSession.") + operation + } + def setConfMap(conf: SQLConf, confMap: java.util.Map[String, String]): Unit = { val iterator = confMap.entrySet().iterator() while (iterator.hasNext) { diff --git a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/SparkMetadataOperationSuite.scala b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/SparkMetadataOperationSuite.scala index e524861b320f..4ca68bd37dd5 100644 --- a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/SparkMetadataOperationSuite.scala +++ b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/SparkMetadataOperationSuite.scala @@ -166,4 +166,20 @@ class SparkMetadataOperationSuite extends HiveThriftJdbcTest { checkResult(metaData.getColumns(null, "%", "table_not_exist", null), Seq.empty) } } + + test("Spark's own GetTableTypesOperation(SparkGetTableTypesOperation)") { + def checkResult(rs: ResultSet, dbNames: Seq[String]): Unit = { + for (i <- dbNames.indices) { + assert(rs.next()) + assert(rs.getString("TABLE_TYPE") === dbNames(i)) + } + // Make sure there are no more elements + assert(!rs.next()) + } + + withJdbcStatement() { statement => + val metaData = statement.getConnection.getMetaData + checkResult(metaData.getTableTypes, Seq("TABLE", "VIEW")) + } + } } diff --git a/sql/hive-thriftserver/v1.2.1/src/main/java/org/apache/hive/service/cli/operation/GetTableTypesOperation.java b/sql/hive-thriftserver/v1.2.1/src/main/java/org/apache/hive/service/cli/operation/GetTableTypesOperation.java index 3ae012a72764..b372f55cedd1 100644 --- a/sql/hive-thriftserver/v1.2.1/src/main/java/org/apache/hive/service/cli/operation/GetTableTypesOperation.java +++ b/sql/hive-thriftserver/v1.2.1/src/main/java/org/apache/hive/service/cli/operation/GetTableTypesOperation.java @@ -39,7 +39,7 @@ public class GetTableTypesOperation extends MetadataOperation { protected static TableSchema RESULT_SET_SCHEMA = new TableSchema() .addStringColumn("TABLE_TYPE", "Table type name."); - private final RowSet rowSet; + protected final RowSet rowSet; private final TableTypeMapping tableTypeMapping; protected GetTableTypesOperation(HiveSession parentSession) { diff --git a/sql/hive-thriftserver/v2.3.5/src/main/java/org/apache/hive/service/cli/operation/GetTableTypesOperation.java b/sql/hive-thriftserver/v2.3.5/src/main/java/org/apache/hive/service/cli/operation/GetTableTypesOperation.java index 875af7150fc1..cf330cbf4100 100644 --- a/sql/hive-thriftserver/v2.3.5/src/main/java/org/apache/hive/service/cli/operation/GetTableTypesOperation.java +++ b/sql/hive-thriftserver/v2.3.5/src/main/java/org/apache/hive/service/cli/operation/GetTableTypesOperation.java @@ -39,7 +39,7 @@ public class GetTableTypesOperation extends MetadataOperation { protected static TableSchema RESULT_SET_SCHEMA = new TableSchema() .addStringColumn("TABLE_TYPE", "Table type name."); - private final RowSet rowSet; + protected final RowSet rowSet; private final TableTypeMapping tableTypeMapping; protected GetTableTypesOperation(HiveSession parentSession) { From 9d6e73c7d3b4c6b0e18f9e940c747e70ec241cb3 Mon Sep 17 00:00:00 2001 From: Yuming Wang Date: Mon, 8 Jul 2019 21:20:45 +0800 Subject: [PATCH 2/3] Add SparkMetadataOperationUtils --- .../SparkGetTableTypesOperation.scala | 22 +++++------- .../SparkGetTablesOperation.scala | 9 +---- .../SparkMetadataOperationUtils.scala | 34 +++++++++++++++++++ .../server/SparkSQLOperationManager.scala | 4 +-- 4 files changed, 45 insertions(+), 24 deletions(-) create mode 100644 sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkMetadataOperationUtils.scala diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetTableTypesOperation.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetTableTypesOperation.scala index 959d93dc4dc1..8a9b04a29030 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetTableTypesOperation.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetTableTypesOperation.scala @@ -27,8 +27,6 @@ import org.apache.hive.service.cli.session.HiveSession import org.apache.spark.internal.Logging import org.apache.spark.sql.SQLContext import org.apache.spark.sql.catalyst.catalog.CatalogTableType -import org.apache.spark.sql.catalyst.catalog.CatalogTableType._ -import org.apache.spark.sql.hive.thriftserver.HiveThriftServer2.listener import org.apache.spark.util.{Utils => SparkUtils} /** @@ -40,7 +38,7 @@ import org.apache.spark.util.{Utils => SparkUtils} private[hive] class SparkGetTableTypesOperation( sqlContext: SQLContext, parentSession: HiveSession) - extends GetTableTypesOperation(parentSession) with Logging { + extends GetTableTypesOperation(parentSession) with SparkMetadataOperationUtils with Logging { override def runInternal(): Unit = { val statementId = UUID.randomUUID().toString @@ -55,7 +53,7 @@ private[hive] class SparkGetTableTypesOperation( authorizeMetaGets(HiveOperationType.GET_TABLETYPES, null) } - listener.onStatementStart( + HiveThriftServer2.listener.onStatementStart( statementId, parentSession.getSessionHandle.getSessionId.toString, logMsg, @@ -63,22 +61,18 @@ private[hive] class SparkGetTableTypesOperation( parentSession.getUsername) try { - CatalogTableType.tableTypes.foreach { tableType => - if (tableType == EXTERNAL || tableType == EXTERNAL) { - rowSet.addRow(Array[AnyRef]("TABLE")) - } else if (tableType == VIEW) { - rowSet.addRow(Array[AnyRef](tableType.name)) - } else { - logError(s"Unknown table type: ${tableType.name}") - } + val tableTypes = CatalogTableType.tableTypes.map(tableTypeString).toSet + tableTypes.foreach { tableType => + rowSet.addRow(Array[AnyRef](tableType)) } setState(OperationState.FINISHED) } catch { case e: HiveSQLException => setState(OperationState.ERROR) - listener.onStatementError(statementId, e.getMessage, SparkUtils.exceptionString(e)) + HiveThriftServer2.listener.onStatementError( + statementId, e.getMessage, SparkUtils.exceptionString(e)) throw e } - listener.onStatementFinish(statementId) + HiveThriftServer2.listener.onStatementFinish(statementId) } } diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetTablesOperation.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetTablesOperation.scala index 878683692fb6..5014ed80a553 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetTablesOperation.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetTablesOperation.scala @@ -53,7 +53,7 @@ private[hive] class SparkGetTablesOperation( tableName: String, tableTypes: JList[String]) extends GetTablesOperation(parentSession, catalogName, schemaName, tableName, tableTypes) - with Logging{ + with SparkMetadataOperationUtils with Logging { override def runInternal(): Unit = { val statementId = UUID.randomUUID().toString @@ -139,11 +139,4 @@ private[hive] class SparkGetTablesOperation( rowSet.addRow(rowData) } } - - private def tableTypeString(tableType: CatalogTableType): String = tableType match { - case EXTERNAL | MANAGED => "TABLE" - case VIEW => "VIEW" - case t => - throw new IllegalArgumentException(s"Unknown table type is found at showCreateHiveTable: $t") - } } diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkMetadataOperationUtils.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkMetadataOperationUtils.scala new file mode 100644 index 000000000000..f4c4b04bada2 --- /dev/null +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkMetadataOperationUtils.scala @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hive.thriftserver + +import org.apache.spark.sql.catalyst.catalog.CatalogTableType +import org.apache.spark.sql.catalyst.catalog.CatalogTableType.{EXTERNAL, MANAGED, VIEW} + +/** + * Utils for metadata operations. + */ +private[hive] trait SparkMetadataOperationUtils { + + def tableTypeString(tableType: CatalogTableType): String = tableType match { + case EXTERNAL | MANAGED => "TABLE" + case VIEW => "VIEW" + case t => + throw new IllegalArgumentException(s"Unknown table type is found: $t") + } +} diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/server/SparkSQLOperationManager.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/server/SparkSQLOperationManager.scala index 45d3d4fa89a2..9b4198d7e7a7 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/server/SparkSQLOperationManager.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/server/SparkSQLOperationManager.scala @@ -100,7 +100,7 @@ private[thriftserver] class SparkSQLOperationManager() columnName: String): GetColumnsOperation = synchronized { val sqlContext = sessionToContexts.get(parentSession.getSessionHandle) require(sqlContext != null, s"Session handle: ${parentSession.getSessionHandle} has not been" + - s" initialized or had already closed.") + " initialized or had already closed.") val operation = new SparkGetColumnsOperation(sqlContext, parentSession, catalogName, schemaName, tableName, columnName) handleToOperation.put(operation.getHandle, operation) @@ -112,7 +112,7 @@ private[thriftserver] class SparkSQLOperationManager() parentSession: HiveSession): GetTableTypesOperation = synchronized { val sqlContext = sessionToContexts.get(parentSession.getSessionHandle) require(sqlContext != null, s"Session handle: ${parentSession.getSessionHandle} has not been" + - s" initialized or had already closed.") + " initialized or had already closed.") val operation = new SparkGetTableTypesOperation(sqlContext, parentSession) handleToOperation.put(operation.getHandle, operation) logDebug(s"Created GetTableTypesOperation with session=$parentSession.") From a008c81eddff1bd252326f2acbb6b77605529556 Mon Sep 17 00:00:00 2001 From: Yuming Wang Date: Sat, 13 Jul 2019 07:00:39 +0800 Subject: [PATCH 3/3] Merge master --- .../thriftserver/SparkGetTableTypesOperation.scala | 11 +++++++++-- .../thriftserver/SparkMetadataOperationSuite.scala | 6 +++--- 2 files changed, 12 insertions(+), 5 deletions(-) diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetTableTypesOperation.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetTableTypesOperation.scala index 8a9b04a29030..8f2257f77d2a 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetTableTypesOperation.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetTableTypesOperation.scala @@ -40,9 +40,16 @@ private[hive] class SparkGetTableTypesOperation( parentSession: HiveSession) extends GetTableTypesOperation(parentSession) with SparkMetadataOperationUtils with Logging { + private var statementId: String = _ + + override def close(): Unit = { + super.close() + HiveThriftServer2.listener.onOperationClosed(statementId) + } + override def runInternal(): Unit = { - val statementId = UUID.randomUUID().toString - val logMsg = s"Listing table types" + statementId = UUID.randomUUID().toString + val logMsg = "Listing table types" logInfo(s"$logMsg with $statementId") setState(OperationState.RUNNING) // Always use the latest class loader provided by executionHive's state. diff --git a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/SparkMetadataOperationSuite.scala b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/SparkMetadataOperationSuite.scala index 4ca68bd37dd5..80a7db5405ca 100644 --- a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/SparkMetadataOperationSuite.scala +++ b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/SparkMetadataOperationSuite.scala @@ -168,10 +168,10 @@ class SparkMetadataOperationSuite extends HiveThriftJdbcTest { } test("Spark's own GetTableTypesOperation(SparkGetTableTypesOperation)") { - def checkResult(rs: ResultSet, dbNames: Seq[String]): Unit = { - for (i <- dbNames.indices) { + def checkResult(rs: ResultSet, tableTypes: Seq[String]): Unit = { + for (i <- tableTypes.indices) { assert(rs.next()) - assert(rs.getString("TABLE_TYPE") === dbNames(i)) + assert(rs.getString("TABLE_TYPE") === tableTypes(i)) } // Make sure there are no more elements assert(!rs.next())