From 9f528bcb8a5cbc60441e81568298ded334a66d36 Mon Sep 17 00:00:00 2001 From: Yuming Wang Date: Mon, 22 Oct 2018 17:39:37 +0800 Subject: [PATCH 1/7] Fix SQL client tools cannot show schemas/tables --- .../sql/hive/thriftserver/HiveThriftServer2.scala | 14 +++++++++++++- .../spark/sql/hive/thriftserver/SparkSQLEnv.scala | 11 +++++++++++ 2 files changed, 24 insertions(+), 1 deletion(-) diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala index 64d924fdb009..fcd6ddfa2e96 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala @@ -90,9 +90,21 @@ object HiveThriftServer2 extends Logging { uiTab.foreach(_.detach()) } + val hadoopConf = SparkSQLEnv.sqlContext.sessionState.newHadoopConf() + val executionHive = HiveUtils.newClientForExecution( SparkSQLEnv.sqlContext.sparkContext.conf, - SparkSQLEnv.sqlContext.sessionState.newHadoopConf()) + hadoopConf + ) + + Seq(ConfVars.METASTOREURIS, + ConfVars.METASTORECONNECTURLKEY, + ConfVars.METASTORE_CONNECTION_USER_NAME, + ConfVars.METASTOREPWD, + ConfVars.METASTORE_CONNECTION_DRIVER, + ConfVars.METASTOREWAREHOUSE).foreach { key => + executionHive.conf.set(key.varname, hadoopConf.get(key.varname)) + } try { val server = new HiveThriftServer2(SparkSQLEnv.sqlContext) diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLEnv.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLEnv.scala index 8980bcf88558..232c17f0089a 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLEnv.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLEnv.scala @@ -19,6 +19,8 @@ package org.apache.spark.sql.hive.thriftserver import java.io.PrintStream +import org.apache.hadoop.hive.conf.HiveConf.ConfVars + import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.internal.Logging import org.apache.spark.sql.{SparkSession, SQLContext} @@ -55,6 +57,15 @@ private[hive] object SparkSQLEnv extends Logging { metadataHive.setInfo(new PrintStream(System.err, true, "UTF-8")) metadataHive.setError(new PrintStream(System.err, true, "UTF-8")) sparkSession.conf.set(HiveUtils.FAKE_HIVE_VERSION.key, HiveUtils.builtinHiveVersion) + Seq(ConfVars.METASTOREURIS, + ConfVars.METASTORE_CONNECTION_DRIVER, + ConfVars.METASTOREWAREHOUSE, + ConfVars.METASTOREPWD, + ConfVars.METASTORE_CONNECTION_USER_NAME, + ConfVars.METASTORECONNECTURLKEY).foreach { key => + sparkContext.hadoopConfiguration + .set(key.varname, metadataHive.getConf(key.varname, key.defaultStrVal)) + } } } From 2939178e5b85ad30fd59f2df8834f266cf8c408a Mon Sep 17 00:00:00 2001 From: Yuming Wang Date: Tue, 23 Oct 2018 16:58:01 +0800 Subject: [PATCH 2/7] Add test --- .../hive/thriftserver/HiveThriftServer2.scala | 18 ++----- .../sql/hive/thriftserver/SparkSQLEnv.scala | 11 ++--- .../HiveThriftServer2Suites.scala | 49 +++++++++++++++++-- .../org/apache/spark/sql/hive/HiveUtils.scala | 19 +++++++ 4 files changed, 70 insertions(+), 27 deletions(-) diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala index fcd6ddfa2e96..5247a111ff8a 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala @@ -55,7 +55,7 @@ object HiveThriftServer2 extends Logging { def startWithContext(sqlContext: SQLContext): Unit = { val server = new HiveThriftServer2(sqlContext) - val executionHive = HiveUtils.newClientForExecution( + val executionHive = HiveUtils.newClientFromHadoopConfig( sqlContext.sparkContext.conf, sqlContext.sessionState.newHadoopConf()) @@ -90,21 +90,9 @@ object HiveThriftServer2 extends Logging { uiTab.foreach(_.detach()) } - val hadoopConf = SparkSQLEnv.sqlContext.sessionState.newHadoopConf() - - val executionHive = HiveUtils.newClientForExecution( + val executionHive = HiveUtils.newClientFromHadoopConfig( SparkSQLEnv.sqlContext.sparkContext.conf, - hadoopConf - ) - - Seq(ConfVars.METASTOREURIS, - ConfVars.METASTORECONNECTURLKEY, - ConfVars.METASTORE_CONNECTION_USER_NAME, - ConfVars.METASTOREPWD, - ConfVars.METASTORE_CONNECTION_DRIVER, - ConfVars.METASTOREWAREHOUSE).foreach { key => - executionHive.conf.set(key.varname, hadoopConf.get(key.varname)) - } + SparkSQLEnv.sqlContext.sessionState.newHadoopConf()) try { val server = new HiveThriftServer2(SparkSQLEnv.sqlContext) diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLEnv.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLEnv.scala index 232c17f0089a..84dbdd3b1ed7 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLEnv.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLEnv.scala @@ -19,7 +19,7 @@ package org.apache.spark.sql.hive.thriftserver import java.io.PrintStream -import org.apache.hadoop.hive.conf.HiveConf.ConfVars +import org.apache.hadoop.hive.conf.HiveConf import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.internal.Logging @@ -57,14 +57,9 @@ private[hive] object SparkSQLEnv extends Logging { metadataHive.setInfo(new PrintStream(System.err, true, "UTF-8")) metadataHive.setError(new PrintStream(System.err, true, "UTF-8")) sparkSession.conf.set(HiveUtils.FAKE_HIVE_VERSION.key, HiveUtils.builtinHiveVersion) - Seq(ConfVars.METASTOREURIS, - ConfVars.METASTORE_CONNECTION_DRIVER, - ConfVars.METASTOREWAREHOUSE, - ConfVars.METASTOREPWD, - ConfVars.METASTORE_CONNECTION_USER_NAME, - ConfVars.METASTORECONNECTURLKEY).foreach { key => + HiveConf.metaVars.foreach { metaKey => sparkContext.hadoopConfiguration - .set(key.varname, metadataHive.getConf(key.varname, key.defaultStrVal)) + .set(metaKey.varname, metadataHive.getConf(metaKey.varname, metaKey.defaultStrVal)) } } } diff --git a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala index 70eb28cdd0c6..e62a32b9ef70 100644 --- a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala +++ b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala @@ -21,7 +21,7 @@ import java.io.{File, FilenameFilter} import java.net.URL import java.nio.charset.StandardCharsets import java.sql.{Date, DriverManager, SQLException, Statement} -import java.util.UUID +import java.util.{Properties, UUID} import scala.collection.mutable import scala.collection.mutable.ArrayBuffer @@ -32,11 +32,11 @@ import scala.util.{Random, Try} import com.google.common.io.Files import org.apache.hadoop.hive.conf.HiveConf.ConfVars -import org.apache.hive.jdbc.HiveDriver +import org.apache.hive.jdbc.{HiveConnection, HiveDriver, HiveQueryResultSet, Utils => JdbcUtils} import org.apache.hive.service.auth.PlainSaslHelper import org.apache.hive.service.cli.{FetchOrientation, FetchType, GetInfoType} +import org.apache.hive.service.cli.thrift._ import org.apache.hive.service.cli.thrift.TCLIService.Client -import org.apache.hive.service.cli.thrift.ThriftCLIServiceClient import org.apache.thrift.protocol.TBinaryProtocol import org.apache.thrift.transport.TSocket import org.scalatest.BeforeAndAfterAll @@ -280,7 +280,7 @@ class HiveThriftBinaryServerSuite extends HiveThriftJdbcTest { var defaultV2: String = null var data: ArrayBuffer[Int] = null - withMultipleConnectionJdbcStatement("test_map")( + withMultipleConnectionJdbcStatement("test_map", "db1.test_map2")( // create table { statement => @@ -644,6 +644,47 @@ class HiveThriftBinaryServerSuite extends HiveThriftJdbcTest { assert(resultSet.getString(1) === "4.56") } } + + test("SPARK-24196: SQL client(DBeaver) can't show tables") { + val testTableName = "test" + withJdbcStatement(testTableName) { statement => + // Create table first + val queries = Seq(s"CREATE TABLE $testTableName(key INT, val STRING)") + queries.foreach(statement.execute) + + val rawTransport = new TSocket("localhost", serverPort) + val connection = new HiveConnection(s"jdbc:hive2://localhost:$serverPort", new Properties) + val user = System.getProperty("user.name") + val transport = PlainSaslHelper.getPlainTransport(user, "anonymous", rawTransport) + val client = new TCLIService.Client(new TBinaryProtocol(transport)) + transport.open() + try { + val openResp = client.OpenSession(new TOpenSessionReq) + val sessHandle = openResp.getSessionHandle + + val getTableReq = new TGetTablesReq(sessHandle) + getTableReq.setTableName("%") + + val getTableResp = client.GetTables(getTableReq) + + JdbcUtils.verifySuccess(getTableResp.getStatus) + + val rs = new HiveQueryResultSet.Builder(connection) + .setClient(client) + .setSessionHandle(sessHandle) + .setStmtHandle(getTableResp.getOperationHandle) + .build() + + assert(rs.next()) + assert(rs.getRow === 1) + assert(rs.getString("TABLE_NAME") === testTableName) + } finally { + connection.close() + transport.close() + rawTransport.close() + } + } + } } class SingleSessionSuite extends HiveThriftJdbcTest { diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala index cd321d41f43e..d8723f869a41 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala @@ -273,6 +273,25 @@ private[spark] object HiveUtils extends Logging { loader.createClient().asInstanceOf[HiveClientImpl] } + /** + * Create a [[HiveClient]] with hadoop configuration for execution. + */ + protected[hive] def newClientFromHadoopConfig( + conf: SparkConf, + hadoopConf: Configuration): HiveClientImpl = { + val configurations = formatTimeVarsForHiveClient(hadoopConf) + logInfo(s"Initializing execution hive with hadoop configuration, version $builtinHiveVersion") + val loader = new IsolatedClientLoader( + version = IsolatedClientLoader.hiveVersion(builtinHiveVersion), + sparkConf = conf, + execJars = Seq.empty, + hadoopConf = hadoopConf, + config = configurations, + isolationOn = false, + baseClassLoader = Utils.getContextOrSparkClassLoader) + loader.createClient().asInstanceOf[HiveClientImpl] + } + /** * Create a [[HiveClient]] used to retrieve metadata from the Hive MetaStore. * From 8449cdfa5dd08b762881c9680ec0dc64ac43de71 Mon Sep 17 00:00:00 2001 From: Yuming Wang Date: Wed, 24 Oct 2018 16:36:04 +0800 Subject: [PATCH 3/7] Implement GetTablesOperation --- .../hive/thriftserver/HiveThriftServer2.scala | 4 +- .../SparkGetTablesOperation.scala | 123 ++++++++++++++++++ .../sql/hive/thriftserver/SparkSQLEnv.scala | 6 - .../server/SparkSQLOperationManager.scala | 22 +++- .../org/apache/spark/sql/hive/HiveUtils.scala | 19 --- 5 files changed, 144 insertions(+), 30 deletions(-) create mode 100644 sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetTablesOperation.scala diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala index 5247a111ff8a..64d924fdb009 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala @@ -55,7 +55,7 @@ object HiveThriftServer2 extends Logging { def startWithContext(sqlContext: SQLContext): Unit = { val server = new HiveThriftServer2(sqlContext) - val executionHive = HiveUtils.newClientFromHadoopConfig( + val executionHive = HiveUtils.newClientForExecution( sqlContext.sparkContext.conf, sqlContext.sessionState.newHadoopConf()) @@ -90,7 +90,7 @@ object HiveThriftServer2 extends Logging { uiTab.foreach(_.detach()) } - val executionHive = HiveUtils.newClientFromHadoopConfig( + val executionHive = HiveUtils.newClientForExecution( SparkSQLEnv.sqlContext.sparkContext.conf, SparkSQLEnv.sqlContext.sessionState.newHadoopConf()) diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetTablesOperation.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetTablesOperation.scala new file mode 100644 index 000000000000..811dbb00e19b --- /dev/null +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetTablesOperation.scala @@ -0,0 +1,123 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hive.thriftserver + +import java.util.{List => JList, Map => JMap, UUID} + +import scala.collection.JavaConverters.seqAsJavaListConverter + +import org.apache.hadoop.hive.ql.security.authorization.plugin.HiveOperationType +import org.apache.hadoop.hive.ql.security.authorization.plugin.HivePrivilegeObjectUtils +import org.apache.hive.service.cli._ +import org.apache.hive.service.cli.operation.GetTablesOperation +import org.apache.hive.service.cli.session.HiveSession + +import org.apache.spark.internal.Logging +import org.apache.spark.sql.SQLContext +import org.apache.spark.sql.catalyst.catalog.CatalogTableType._ +import org.apache.spark.sql.catalyst.catalog.SessionCatalog + +private[hive] class SparkGetTablesOperation( + parentSession: HiveSession, + catalogName: String, + schemaName: String, + tableName: String, + tableTypes: JList[String]) + (sqlContext: SQLContext, sessionToActivePool: JMap[SessionHandle, String]) + extends GetTablesOperation(parentSession, catalogName, schemaName, tableName, tableTypes) + with Logging { + + val catalog: SessionCatalog = sqlContext.sessionState.catalog + + private final val RESULT_SET_SCHEMA = new TableSchema() + .addStringColumn("TABLE_CAT", "Catalog name. NULL if not applicable.") + .addStringColumn("TABLE_SCHEM", "Schema name.") + .addStringColumn("TABLE_NAME", "Table name.") + .addStringColumn("TABLE_TYPE", "The table type, e.g. \"TABLE\", \"VIEW\", etc.") + .addStringColumn("REMARKS", "Comments about the table.") + + private val rowSet = RowSetFactory.create(RESULT_SET_SCHEMA, getProtocolVersion) + + private val sparkToClientMapping = Map(EXTERNAL -> "TABLE", MANAGED -> "TABLE", VIEW -> "VIEW") + + if (tableTypes != null) { + this.tableTypes.addAll(tableTypes) + } + + private var statementId: String = _ + + override def close(): Unit = { + logInfo(s"Close get tables with $statementId") + setState(OperationState.CLOSED) + } + + override def getNextRowSet(order: FetchOrientation, maxRows: Long): RowSet = { + validateDefaultFetchOrientation(order) + assertState(OperationState.FINISHED) + setHasResultSet(true) + if (order.equals(FetchOrientation.FETCH_FIRST)) { + rowSet.setStartOffset(0) + } + rowSet.extractSubset(maxRows.toInt) + } + + override def runInternal(): Unit = { + statementId = UUID.randomUUID().toString + logInfo(s"Getting tables with $statementId") + setState(OperationState.RUNNING) + // Always use the latest class loader provided by executionHive's state. + val executionHiveClassLoader = sqlContext.sharedState.jarClassLoader + Thread.currentThread().setContextClassLoader(executionHiveClassLoader) + + val schemaPattern = convertSchemaPattern(schemaName) + val matchingDbs = catalog.listDatabases(schemaPattern) + + if (isAuthV2Enabled) { + val privObjs = + HivePrivilegeObjectUtils.getHivePrivDbObjects(seqAsJavaListConverter(matchingDbs).asJava) + val cmdStr = s"catalog : $catalogName, schemaPattern : $schemaName" + authorizeMetaGets(HiveOperationType.GET_TABLES, privObjs, cmdStr) + } + + val tablePattern = convertIdentifierPattern(tableName, true) + matchingDbs.foreach { dbName => + catalog.listTables(dbName, tablePattern).foreach { tableIdentifier => + val catalogTable = catalog.getTableMetadata(tableIdentifier) + + val rowData = Array[AnyRef]( + "", + catalogTable.database, + catalogTable.identifier.table, + sparkToClientMapping.get(catalogTable.tableType), + catalogTable.comment) + + if (tableTypes == null || tableTypes.isEmpty || tableTypes.contains( + sparkToClientMapping.get(catalogTable.tableType))) { + rowSet.addRow(rowData) + } + } + } + + setState(OperationState.FINISHED) + } + + override def cancel(): Unit = { + logInfo(s"Cancel get tables with $statementId") + setState(OperationState.CANCELED) + } +} diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLEnv.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLEnv.scala index 84dbdd3b1ed7..8980bcf88558 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLEnv.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLEnv.scala @@ -19,8 +19,6 @@ package org.apache.spark.sql.hive.thriftserver import java.io.PrintStream -import org.apache.hadoop.hive.conf.HiveConf - import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.internal.Logging import org.apache.spark.sql.{SparkSession, SQLContext} @@ -57,10 +55,6 @@ private[hive] object SparkSQLEnv extends Logging { metadataHive.setInfo(new PrintStream(System.err, true, "UTF-8")) metadataHive.setError(new PrintStream(System.err, true, "UTF-8")) sparkSession.conf.set(HiveUtils.FAKE_HIVE_VERSION.key, HiveUtils.builtinHiveVersion) - HiveConf.metaVars.foreach { metaKey => - sparkContext.hadoopConfiguration - .set(metaKey.varname, metadataHive.getConf(metaKey.varname, metaKey.defaultStrVal)) - } } } diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/server/SparkSQLOperationManager.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/server/SparkSQLOperationManager.scala index bf7c01f60fb5..b856665021aa 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/server/SparkSQLOperationManager.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/server/SparkSQLOperationManager.scala @@ -17,17 +17,17 @@ package org.apache.spark.sql.hive.thriftserver.server -import java.util.{Map => JMap} +import java.util.{List => JList, Map => JMap} import java.util.concurrent.ConcurrentHashMap import org.apache.hive.service.cli._ -import org.apache.hive.service.cli.operation.{ExecuteStatementOperation, Operation, OperationManager} +import org.apache.hive.service.cli.operation.{ExecuteStatementOperation, MetadataOperation, Operation, OperationManager} import org.apache.hive.service.cli.session.HiveSession import org.apache.spark.internal.Logging import org.apache.spark.sql.SQLContext import org.apache.spark.sql.hive.HiveUtils -import org.apache.spark.sql.hive.thriftserver.{ReflectionUtils, SparkExecuteStatementOperation} +import org.apache.spark.sql.hive.thriftserver.{ReflectionUtils, SparkExecuteStatementOperation, SparkGetTablesOperation} import org.apache.spark.sql.internal.SQLConf /** @@ -63,6 +63,22 @@ private[thriftserver] class SparkSQLOperationManager() operation } + override def newGetTablesOperation( + parentSession: HiveSession, + catalogName: String, + schemaName: String, + tableName: String, + tableTypes: JList[String]): MetadataOperation = synchronized { + val sqlContext = sessionToContexts.get(parentSession.getSessionHandle) + require(sqlContext != null, s"Session handle: ${parentSession.getSessionHandle} has not been" + + s" initialized or had already closed.") + val operation = new SparkGetTablesOperation(parentSession, + catalogName, schemaName, tableName, tableTypes)(sqlContext, sessionToActivePool) + handleToOperation.put(operation.getHandle, operation) + logDebug(s"Created GetTablesOperation with session=$parentSession.") + operation + } + def setConfMap(conf: SQLConf, confMap: java.util.Map[String, String]): Unit = { val iterator = confMap.entrySet().iterator() while (iterator.hasNext) { diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala index d8723f869a41..cd321d41f43e 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala @@ -273,25 +273,6 @@ private[spark] object HiveUtils extends Logging { loader.createClient().asInstanceOf[HiveClientImpl] } - /** - * Create a [[HiveClient]] with hadoop configuration for execution. - */ - protected[hive] def newClientFromHadoopConfig( - conf: SparkConf, - hadoopConf: Configuration): HiveClientImpl = { - val configurations = formatTimeVarsForHiveClient(hadoopConf) - logInfo(s"Initializing execution hive with hadoop configuration, version $builtinHiveVersion") - val loader = new IsolatedClientLoader( - version = IsolatedClientLoader.hiveVersion(builtinHiveVersion), - sparkConf = conf, - execJars = Seq.empty, - hadoopConf = hadoopConf, - config = configurations, - isolationOn = false, - baseClassLoader = Utils.getContextOrSparkClassLoader) - loader.createClient().asInstanceOf[HiveClientImpl] - } - /** * Create a [[HiveClient]] used to retrieve metadata from the Hive MetaStore. * From e9a2a93e47927b26a57b4f62627345ad6cf3d5ed Mon Sep 17 00:00:00 2001 From: Yuming Wang Date: Wed, 24 Oct 2018 22:59:35 +0800 Subject: [PATCH 4/7] Only correct tableType constructor rowData --- .../SparkGetTablesOperation.scala | 20 +++++++++---------- 1 file changed, 9 insertions(+), 11 deletions(-) diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetTablesOperation.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetTablesOperation.scala index 811dbb00e19b..26700786863f 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetTablesOperation.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetTablesOperation.scala @@ -98,18 +98,16 @@ private[hive] class SparkGetTablesOperation( matchingDbs.foreach { dbName => catalog.listTables(dbName, tablePattern).foreach { tableIdentifier => val catalogTable = catalog.getTableMetadata(tableIdentifier) - - val rowData = Array[AnyRef]( - "", - catalogTable.database, - catalogTable.identifier.table, - sparkToClientMapping.get(catalogTable.tableType), - catalogTable.comment) - - if (tableTypes == null || tableTypes.isEmpty || tableTypes.contains( - sparkToClientMapping.get(catalogTable.tableType))) { + val tableType = sparkToClientMapping(catalogTable.tableType) + if (tableTypes == null || tableTypes.isEmpty || tableTypes.contains(tableType)) { + val rowData = Array[AnyRef]( + "", + catalogTable.database, + catalogTable.identifier.table, + tableType, + catalogTable.comment.getOrElse("")) rowSet.addRow(rowData) - } + } } } From 80a8d2153ab5c96fdb9fe7cf54f9f8c60cf930b3 Mon Sep 17 00:00:00 2001 From: Yuming Wang Date: Thu, 25 Oct 2018 14:39:33 +0800 Subject: [PATCH 5/7] Add SparkMetadataOperationSuite --- .../SparkGetTablesOperation.scala | 14 ++- .../server/SparkSQLOperationManager.scala | 4 +- .../HiveThriftServer2Suites.scala | 47 +------ .../SparkMetadataOperationSuite.scala | 117 ++++++++++++++++++ 4 files changed, 134 insertions(+), 48 deletions(-) create mode 100644 sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/SparkMetadataOperationSuite.scala diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetTablesOperation.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetTablesOperation.scala index 26700786863f..c280dd14a336 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetTablesOperation.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetTablesOperation.scala @@ -17,7 +17,7 @@ package org.apache.spark.sql.hive.thriftserver -import java.util.{List => JList, Map => JMap, UUID} +import java.util.{List => JList, UUID} import scala.collection.JavaConverters.seqAsJavaListConverter @@ -32,13 +32,23 @@ import org.apache.spark.sql.SQLContext import org.apache.spark.sql.catalyst.catalog.CatalogTableType._ import org.apache.spark.sql.catalyst.catalog.SessionCatalog +/** + * Spark's own GetTablesOperation + * + * @param sqlContext SQLContext to use + * @param parentSession a HiveSession from SessionManager + * @param catalogName catalog name. null if not applicable + * @param schemaName database name, null or a concrete database name + * @param tableName table name pattern + * @param tableTypes list of allowed table types, e.g. "TABLE", "VIEW" + */ private[hive] class SparkGetTablesOperation( + sqlContext: SQLContext, parentSession: HiveSession, catalogName: String, schemaName: String, tableName: String, tableTypes: JList[String]) - (sqlContext: SQLContext, sessionToActivePool: JMap[SessionHandle, String]) extends GetTablesOperation(parentSession, catalogName, schemaName, tableName, tableTypes) with Logging { diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/server/SparkSQLOperationManager.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/server/SparkSQLOperationManager.scala index b856665021aa..47e1402562b0 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/server/SparkSQLOperationManager.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/server/SparkSQLOperationManager.scala @@ -72,8 +72,8 @@ private[thriftserver] class SparkSQLOperationManager() val sqlContext = sessionToContexts.get(parentSession.getSessionHandle) require(sqlContext != null, s"Session handle: ${parentSession.getSessionHandle} has not been" + s" initialized or had already closed.") - val operation = new SparkGetTablesOperation(parentSession, - catalogName, schemaName, tableName, tableTypes)(sqlContext, sessionToActivePool) + val operation = new SparkGetTablesOperation(sqlContext, parentSession, + catalogName, schemaName, tableName, tableTypes) handleToOperation.put(operation.getHandle, operation) logDebug(s"Created GetTablesOperation with session=$parentSession.") operation diff --git a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala index e62a32b9ef70..edd8990c825b 100644 --- a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala +++ b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala @@ -21,7 +21,7 @@ import java.io.{File, FilenameFilter} import java.net.URL import java.nio.charset.StandardCharsets import java.sql.{Date, DriverManager, SQLException, Statement} -import java.util.{Properties, UUID} +import java.util.UUID import scala.collection.mutable import scala.collection.mutable.ArrayBuffer @@ -32,11 +32,11 @@ import scala.util.{Random, Try} import com.google.common.io.Files import org.apache.hadoop.hive.conf.HiveConf.ConfVars -import org.apache.hive.jdbc.{HiveConnection, HiveDriver, HiveQueryResultSet, Utils => JdbcUtils} +import org.apache.hive.jdbc.HiveDriver import org.apache.hive.service.auth.PlainSaslHelper import org.apache.hive.service.cli.{FetchOrientation, FetchType, GetInfoType} -import org.apache.hive.service.cli.thrift._ import org.apache.hive.service.cli.thrift.TCLIService.Client +import org.apache.hive.service.cli.thrift.ThriftCLIServiceClient import org.apache.thrift.protocol.TBinaryProtocol import org.apache.thrift.transport.TSocket import org.scalatest.BeforeAndAfterAll @@ -644,47 +644,6 @@ class HiveThriftBinaryServerSuite extends HiveThriftJdbcTest { assert(resultSet.getString(1) === "4.56") } } - - test("SPARK-24196: SQL client(DBeaver) can't show tables") { - val testTableName = "test" - withJdbcStatement(testTableName) { statement => - // Create table first - val queries = Seq(s"CREATE TABLE $testTableName(key INT, val STRING)") - queries.foreach(statement.execute) - - val rawTransport = new TSocket("localhost", serverPort) - val connection = new HiveConnection(s"jdbc:hive2://localhost:$serverPort", new Properties) - val user = System.getProperty("user.name") - val transport = PlainSaslHelper.getPlainTransport(user, "anonymous", rawTransport) - val client = new TCLIService.Client(new TBinaryProtocol(transport)) - transport.open() - try { - val openResp = client.OpenSession(new TOpenSessionReq) - val sessHandle = openResp.getSessionHandle - - val getTableReq = new TGetTablesReq(sessHandle) - getTableReq.setTableName("%") - - val getTableResp = client.GetTables(getTableReq) - - JdbcUtils.verifySuccess(getTableResp.getStatus) - - val rs = new HiveQueryResultSet.Builder(connection) - .setClient(client) - .setSessionHandle(sessHandle) - .setStmtHandle(getTableResp.getOperationHandle) - .build() - - assert(rs.next()) - assert(rs.getRow === 1) - assert(rs.getString("TABLE_NAME") === testTableName) - } finally { - connection.close() - transport.close() - rawTransport.close() - } - } - } } class SingleSessionSuite extends HiveThriftJdbcTest { diff --git a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/SparkMetadataOperationSuite.scala b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/SparkMetadataOperationSuite.scala new file mode 100644 index 000000000000..7f87dbf21268 --- /dev/null +++ b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/SparkMetadataOperationSuite.scala @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hive.thriftserver + +import java.util.{Arrays => JArrays, List => JList, Properties} + +import org.apache.hive.jdbc.{HiveConnection, HiveQueryResultSet, Utils => JdbcUtils} +import org.apache.hive.service.auth.PlainSaslHelper +import org.apache.hive.service.cli.thrift._ +import org.apache.thrift.protocol.TBinaryProtocol +import org.apache.thrift.transport.TSocket + +class SparkMetadataOperationSuite extends HiveThriftJdbcTest { + + override def mode: ServerMode.Value = ServerMode.binary + + private def withHiveQueryResultSet( + schema: String, + tableNamePattern: String, + tableTypes: JList[String])(f: HiveQueryResultSet => Unit): Unit = { + val rawTransport = new TSocket("localhost", serverPort) + val connection = new HiveConnection(s"jdbc:hive2://localhost:$serverPort", new Properties) + val user = System.getProperty("user.name") + val transport = PlainSaslHelper.getPlainTransport(user, "anonymous", rawTransport) + val client = new TCLIService.Client(new TBinaryProtocol(transport)) + transport.open() + + var rs: HiveQueryResultSet = null + + try { + val openResp = client.OpenSession(new TOpenSessionReq) + val sessHandle = openResp.getSessionHandle + + val getTableReq = new TGetTablesReq(sessHandle) + getTableReq.setSchemaName(schema) + getTableReq.setTableName(tableNamePattern) + getTableReq.setTableTypes(tableTypes) + + val getTableResp = client.GetTables(getTableReq) + + JdbcUtils.verifySuccess(getTableResp.getStatus) + + rs = new HiveQueryResultSet.Builder(connection) + .setClient(client) + .setSessionHandle(sessHandle) + .setStmtHandle(getTableResp.getOperationHandle) + .build() + + f(rs) + } finally { + rs.close() + connection.close() + transport.close() + rawTransport.close() + } + } + + test("Spark's own GetTablesOperation(SparkGetTablesOperation)") { + + def checkResult(tableNames: Seq[String], rs: HiveQueryResultSet): Unit = { + if (tableNames.nonEmpty) { + for (i <- tableNames.indices) { + assert(rs.next()) + assert(rs.getString("TABLE_NAME") === tableNames(i)) + } + } else { + assert(!rs.next()) + } + } + + withJdbcStatement("table1", "table2") { statement => + Seq("CREATE TABLE table1(key INT, val STRING)", + "CREATE TABLE table2(key INT, val STRING)", + "CREATE VIEW view1 AS SELECT * FROM table2").foreach(statement.execute) + + withHiveQueryResultSet("%", "%", null) { rs => + checkResult(Seq("table1", "table2", "view1"), rs) + } + + withHiveQueryResultSet("%", "table1", null) { rs => + checkResult(Seq("table1"), rs) + } + + withHiveQueryResultSet("%", "table_not_exist", null) { rs => + checkResult(Seq.empty, rs) + } + + withHiveQueryResultSet("%", "%", JArrays.asList("TABLE")) { rs => + checkResult(Seq("table1", "table2"), rs) + } + + withHiveQueryResultSet("%", "%", JArrays.asList("VIEW")) { rs => + checkResult(Seq("view1"), rs) + } + + withHiveQueryResultSet("%", "%", JArrays.asList("TABLE", "VIEW")) { rs => + checkResult(Seq("table1", "table2", "view1"), rs) + } + } + } + +} From 4f5cd273c220701c03362eedbbff4425da9f2f29 Mon Sep 17 00:00:00 2001 From: Yuming Wang Date: Tue, 8 Jan 2019 16:34:32 +0800 Subject: [PATCH 6/7] Change RowSet to protected --- .../cli/operation/GetTablesOperation.java | 2 +- .../SparkGetTablesOperation.scala | 52 ++++--------------- .../SparkMetadataOperationSuite.scala | 17 +++--- 3 files changed, 20 insertions(+), 51 deletions(-) diff --git a/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/operation/GetTablesOperation.java b/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/operation/GetTablesOperation.java index 1a7ca79163d7..2af17a662a29 100644 --- a/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/operation/GetTablesOperation.java +++ b/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/operation/GetTablesOperation.java @@ -46,7 +46,7 @@ public class GetTablesOperation extends MetadataOperation { private final String schemaName; private final String tableName; private final List tableTypes = new ArrayList(); - private final RowSet rowSet; + protected final RowSet rowSet; private final TableTypeMapping tableTypeMapping; diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetTablesOperation.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetTablesOperation.scala index c280dd14a336..369650047b10 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetTablesOperation.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetTablesOperation.scala @@ -17,7 +17,7 @@ package org.apache.spark.sql.hive.thriftserver -import java.util.{List => JList, UUID} +import java.util.{List => JList} import scala.collection.JavaConverters.seqAsJavaListConverter @@ -27,10 +27,9 @@ import org.apache.hive.service.cli._ import org.apache.hive.service.cli.operation.GetTablesOperation import org.apache.hive.service.cli.session.HiveSession -import org.apache.spark.internal.Logging import org.apache.spark.sql.SQLContext +import org.apache.spark.sql.catalyst.catalog.CatalogTableType import org.apache.spark.sql.catalyst.catalog.CatalogTableType._ -import org.apache.spark.sql.catalyst.catalog.SessionCatalog /** * Spark's own GetTablesOperation @@ -49,51 +48,19 @@ private[hive] class SparkGetTablesOperation( schemaName: String, tableName: String, tableTypes: JList[String]) - extends GetTablesOperation(parentSession, catalogName, schemaName, tableName, tableTypes) - with Logging { - - val catalog: SessionCatalog = sqlContext.sessionState.catalog - - private final val RESULT_SET_SCHEMA = new TableSchema() - .addStringColumn("TABLE_CAT", "Catalog name. NULL if not applicable.") - .addStringColumn("TABLE_SCHEM", "Schema name.") - .addStringColumn("TABLE_NAME", "Table name.") - .addStringColumn("TABLE_TYPE", "The table type, e.g. \"TABLE\", \"VIEW\", etc.") - .addStringColumn("REMARKS", "Comments about the table.") - - private val rowSet = RowSetFactory.create(RESULT_SET_SCHEMA, getProtocolVersion) - - private val sparkToClientMapping = Map(EXTERNAL -> "TABLE", MANAGED -> "TABLE", VIEW -> "VIEW") + extends GetTablesOperation(parentSession, catalogName, schemaName, tableName, tableTypes) { if (tableTypes != null) { this.tableTypes.addAll(tableTypes) } - private var statementId: String = _ - - override def close(): Unit = { - logInfo(s"Close get tables with $statementId") - setState(OperationState.CLOSED) - } - - override def getNextRowSet(order: FetchOrientation, maxRows: Long): RowSet = { - validateDefaultFetchOrientation(order) - assertState(OperationState.FINISHED) - setHasResultSet(true) - if (order.equals(FetchOrientation.FETCH_FIRST)) { - rowSet.setStartOffset(0) - } - rowSet.extractSubset(maxRows.toInt) - } - override def runInternal(): Unit = { - statementId = UUID.randomUUID().toString - logInfo(s"Getting tables with $statementId") setState(OperationState.RUNNING) // Always use the latest class loader provided by executionHive's state. val executionHiveClassLoader = sqlContext.sharedState.jarClassLoader Thread.currentThread().setContextClassLoader(executionHiveClassLoader) + val catalog = sqlContext.sessionState.catalog val schemaPattern = convertSchemaPattern(schemaName) val matchingDbs = catalog.listDatabases(schemaPattern) @@ -108,7 +75,7 @@ private[hive] class SparkGetTablesOperation( matchingDbs.foreach { dbName => catalog.listTables(dbName, tablePattern).foreach { tableIdentifier => val catalogTable = catalog.getTableMetadata(tableIdentifier) - val tableType = sparkToClientMapping(catalogTable.tableType) + val tableType = tableTypeString(catalogTable.tableType) if (tableTypes == null || tableTypes.isEmpty || tableTypes.contains(tableType)) { val rowData = Array[AnyRef]( "", @@ -120,12 +87,13 @@ private[hive] class SparkGetTablesOperation( } } } - setState(OperationState.FINISHED) } - override def cancel(): Unit = { - logInfo(s"Cancel get tables with $statementId") - setState(OperationState.CANCELED) + private def tableTypeString(tableType: CatalogTableType): String = tableType match { + case EXTERNAL | MANAGED => "TABLE" + case VIEW => "VIEW" + case t => + throw new IllegalArgumentException(s"Unknown table type is found at showCreateHiveTable: $t") } } diff --git a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/SparkMetadataOperationSuite.scala b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/SparkMetadataOperationSuite.scala index ed1562f15a4d..bf9982388d6b 100644 --- a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/SparkMetadataOperationSuite.scala +++ b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/SparkMetadataOperationSuite.scala @@ -102,7 +102,7 @@ class SparkMetadataOperationSuite extends HiveThriftJdbcTest { } test("Spark's own GetTablesOperation(SparkGetTablesOperation)") { - def testGetSchemasOperation(( + def testGetTablesOperation( schema: String, tableNamePattern: String, tableTypes: JList[String])(f: HiveQueryResultSet => Unit): Unit = { @@ -155,31 +155,32 @@ class SparkMetadataOperationSuite extends HiveThriftJdbcTest { } withJdbcStatement("table1", "table2") { statement => - Seq("CREATE TABLE table1(key INT, val STRING)", + Seq( + "CREATE TABLE table1(key INT, val STRING)", "CREATE TABLE table2(key INT, val STRING)", "CREATE VIEW view1 AS SELECT * FROM table2").foreach(statement.execute) - testGetSchemasOperation("%", "%", null) { rs => + testGetTablesOperation("%", "%", null) { rs => checkResult(Seq("table1", "table2", "view1"), rs) } - testGetSchemasOperation("%", "table1", null) { rs => + testGetTablesOperation("%", "table1", null) { rs => checkResult(Seq("table1"), rs) } - testGetSchemasOperation("%", "table_not_exist", null) { rs => + testGetTablesOperation("%", "table_not_exist", null) { rs => checkResult(Seq.empty, rs) } - testGetSchemasOperation("%", "%", JArrays.asList("TABLE")) { rs => + testGetTablesOperation("%", "%", JArrays.asList("TABLE")) { rs => checkResult(Seq("table1", "table2"), rs) } - testGetSchemasOperation("%", "%", JArrays.asList("VIEW")) { rs => + testGetTablesOperation("%", "%", JArrays.asList("VIEW")) { rs => checkResult(Seq("view1"), rs) } - testGetSchemasOperation("%", "%", JArrays.asList("TABLE", "VIEW")) { rs => + testGetTablesOperation("%", "%", JArrays.asList("TABLE", "VIEW")) { rs => checkResult(Seq("table1", "table2", "view1"), rs) } } From fb7e0a5a0a42aeb6ae12749762297efe1cd6bce4 Mon Sep 17 00:00:00 2001 From: Yuming Wang Date: Tue, 8 Jan 2019 17:06:15 +0800 Subject: [PATCH 7/7] remove s --- .../sql/hive/thriftserver/server/SparkSQLOperationManager.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/server/SparkSQLOperationManager.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/server/SparkSQLOperationManager.scala index 614592d901fb..7947d1785a8f 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/server/SparkSQLOperationManager.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/server/SparkSQLOperationManager.scala @@ -84,7 +84,7 @@ private[thriftserver] class SparkSQLOperationManager() tableTypes: JList[String]): MetadataOperation = synchronized { val sqlContext = sessionToContexts.get(parentSession.getSessionHandle) require(sqlContext != null, s"Session handle: ${parentSession.getSessionHandle} has not been" + - s" initialized or had already closed.") + " initialized or had already closed.") val operation = new SparkGetTablesOperation(sqlContext, parentSession, catalogName, schemaName, tableName, tableTypes) handleToOperation.put(operation.getHandle, operation)