Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Change RowSet to protected
  • Loading branch information
wangyum committed Jan 8, 2019
commit 4f5cd273c220701c03362eedbbff4425da9f2f29
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ public class GetTablesOperation extends MetadataOperation {
private final String schemaName;
private final String tableName;
private final List<String> tableTypes = new ArrayList<String>();
private final RowSet rowSet;
protected final RowSet rowSet;
private final TableTypeMapping tableTypeMapping;


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.spark.sql.hive.thriftserver

import java.util.{List => JList, UUID}
import java.util.{List => JList}

import scala.collection.JavaConverters.seqAsJavaListConverter

Expand All @@ -27,10 +27,9 @@ import org.apache.hive.service.cli._
import org.apache.hive.service.cli.operation.GetTablesOperation
import org.apache.hive.service.cli.session.HiveSession

import org.apache.spark.internal.Logging
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.catalyst.catalog.CatalogTableType
import org.apache.spark.sql.catalyst.catalog.CatalogTableType._
import org.apache.spark.sql.catalyst.catalog.SessionCatalog

/**
* Spark's own GetTablesOperation
Expand All @@ -49,51 +48,19 @@ private[hive] class SparkGetTablesOperation(
schemaName: String,
tableName: String,
tableTypes: JList[String])
extends GetTablesOperation(parentSession, catalogName, schemaName, tableName, tableTypes)
with Logging {

val catalog: SessionCatalog = sqlContext.sessionState.catalog

private final val RESULT_SET_SCHEMA = new TableSchema()
.addStringColumn("TABLE_CAT", "Catalog name. NULL if not applicable.")
.addStringColumn("TABLE_SCHEM", "Schema name.")
.addStringColumn("TABLE_NAME", "Table name.")
.addStringColumn("TABLE_TYPE", "The table type, e.g. \"TABLE\", \"VIEW\", etc.")
.addStringColumn("REMARKS", "Comments about the table.")

private val rowSet = RowSetFactory.create(RESULT_SET_SCHEMA, getProtocolVersion)

private val sparkToClientMapping = Map(EXTERNAL -> "TABLE", MANAGED -> "TABLE", VIEW -> "VIEW")
extends GetTablesOperation(parentSession, catalogName, schemaName, tableName, tableTypes) {

if (tableTypes != null) {
this.tableTypes.addAll(tableTypes)
}

private var statementId: String = _

override def close(): Unit = {
logInfo(s"Close get tables with $statementId")
setState(OperationState.CLOSED)
}

override def getNextRowSet(order: FetchOrientation, maxRows: Long): RowSet = {
validateDefaultFetchOrientation(order)
assertState(OperationState.FINISHED)
setHasResultSet(true)
if (order.equals(FetchOrientation.FETCH_FIRST)) {
rowSet.setStartOffset(0)
}
rowSet.extractSubset(maxRows.toInt)
}

override def runInternal(): Unit = {
statementId = UUID.randomUUID().toString
logInfo(s"Getting tables with $statementId")
setState(OperationState.RUNNING)
// Always use the latest class loader provided by executionHive's state.
val executionHiveClassLoader = sqlContext.sharedState.jarClassLoader
Thread.currentThread().setContextClassLoader(executionHiveClassLoader)

val catalog = sqlContext.sessionState.catalog
val schemaPattern = convertSchemaPattern(schemaName)
val matchingDbs = catalog.listDatabases(schemaPattern)

Expand All @@ -108,7 +75,7 @@ private[hive] class SparkGetTablesOperation(
matchingDbs.foreach { dbName =>
catalog.listTables(dbName, tablePattern).foreach { tableIdentifier =>
val catalogTable = catalog.getTableMetadata(tableIdentifier)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can be very slow for big schemas. Calling getTableMetadata on every table will trigger 3 separate database calls to the metastore (requireDbExists, requireTableExists, and getTable) taking ~tens of ms for every table. So it can be tens of seconds for schemas with hundreds of tables.

The underlying Hive Thriftserver GetTables uses MetastoreClient.getTableObjectsByName (https://hive.apache.org/javadocs/r2.1.1/api/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.html#getTableObjectsByName-java.lang.String-java.util.List-) call to bulk-list the tables, but we don't expose that through our SessionCatalog / ExternalCatalog / HiveClientImpl

Would it be possible to thread that bulk getTableObjectsByName operation through our catalog APIs, to be able to retrieve the tables efficiently here? @wangyum @gatorsmile - what do you think?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@LantaoJin @wangyum Could either of you submit a PR to resolve the issue raised by @juliuszsompolski ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, I will take this issue.

val tableType = sparkToClientMapping(catalogTable.tableType)
val tableType = tableTypeString(catalogTable.tableType)
if (tableTypes == null || tableTypes.isEmpty || tableTypes.contains(tableType)) {
val rowData = Array[AnyRef](
"",
Expand All @@ -120,12 +87,13 @@ private[hive] class SparkGetTablesOperation(
}
}
}

setState(OperationState.FINISHED)
}

override def cancel(): Unit = {
logInfo(s"Cancel get tables with $statementId")
setState(OperationState.CANCELED)
private def tableTypeString(tableType: CatalogTableType): String = tableType match {
case EXTERNAL | MANAGED => "TABLE"
case VIEW => "VIEW"
case t =>
throw new IllegalArgumentException(s"Unknown table type is found at showCreateHiveTable: $t")
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ class SparkMetadataOperationSuite extends HiveThriftJdbcTest {
}

test("Spark's own GetTablesOperation(SparkGetTablesOperation)") {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

def testGetSchemasOperation((
def testGetTablesOperation(
schema: String,
tableNamePattern: String,
tableTypes: JList[String])(f: HiveQueryResultSet => Unit): Unit = {
Expand Down Expand Up @@ -155,31 +155,32 @@ class SparkMetadataOperationSuite extends HiveThriftJdbcTest {
}

withJdbcStatement("table1", "table2") { statement =>
Seq("CREATE TABLE table1(key INT, val STRING)",
Seq(
"CREATE TABLE table1(key INT, val STRING)",
"CREATE TABLE table2(key INT, val STRING)",
"CREATE VIEW view1 AS SELECT * FROM table2").foreach(statement.execute)

testGetSchemasOperation("%", "%", null) { rs =>
testGetTablesOperation("%", "%", null) { rs =>
checkResult(Seq("table1", "table2", "view1"), rs)
}

testGetSchemasOperation("%", "table1", null) { rs =>
testGetTablesOperation("%", "table1", null) { rs =>
checkResult(Seq("table1"), rs)
}

testGetSchemasOperation("%", "table_not_exist", null) { rs =>
testGetTablesOperation("%", "table_not_exist", null) { rs =>
checkResult(Seq.empty, rs)
}

testGetSchemasOperation("%", "%", JArrays.asList("TABLE")) { rs =>
testGetTablesOperation("%", "%", JArrays.asList("TABLE")) { rs =>
checkResult(Seq("table1", "table2"), rs)
}

testGetSchemasOperation("%", "%", JArrays.asList("VIEW")) { rs =>
testGetTablesOperation("%", "%", JArrays.asList("VIEW")) { rs =>
checkResult(Seq("view1"), rs)
}

testGetSchemasOperation("%", "%", JArrays.asList("TABLE", "VIEW")) { rs =>
testGetTablesOperation("%", "%", JArrays.asList("TABLE", "VIEW")) { rs =>
checkResult(Seq("table1", "table2", "view1"), rs)
}
}
Expand Down