Skip to content
Prev Previous commit
Next Next commit
address comments
  • Loading branch information
gatorsmile committed Sep 15, 2016
commit a01f6b39f2073390ebf2445a1eca1c3925d24841
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@ class NoSuchDatabaseException(db: String) extends AnalysisException(s"Database '
class NoSuchTableException(db: String, table: String)
extends AnalysisException(s"Table or view '$table' not found in database '$db'")

class NoSuchTempViewException(table: String)
extends AnalysisException(s"Temporary view '$table' not found")

class NoSuchPartitionException(
db: String,
table: String,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -246,36 +246,27 @@ class SessionCatalog(
}

/**
* Retrieve the metadata of an existing metastore table/view or a temporary view.
* If no database is specified, we check whether the corresponding temporary view exists.
* If the temporary view does not exist, we assume the table/view is in the current database.
* If still not found in the database then a [[NoSuchTableException]] is thrown.
* Retrieve the metadata of an existing temporary view.
* If the temporary view does not exist, a [[NoSuchTempViewException]] is thrown.
*/
def getTableMetadata(name: TableIdentifier): CatalogTable = {
val db = formatDatabaseName(name.database.getOrElse(getCurrentDatabase))
val table = formatTableName(name.table)
val tid = TableIdentifier(table)
if (isTemporaryTable(name)) {
CatalogTable(
identifier = tid,
tableType = CatalogTableType.VIEW,
storage = CatalogStorageFormat.empty,
schema = tempTables(table).output.toStructType,
properties = Map(),
viewText = None)
} else {
requireDbExists(db)
requireTableExists(TableIdentifier(table, Some(db)))
externalCatalog.getTable(db, table)
def getTempViewMetadata(name: String): CatalogTable = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's move these 2 methods to the right section: handles only temp view

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

where do we need this method?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

: ) Just removed getTempViewMetadata and moved getTempViewMetadataOption to the position you want

val table = formatTableName(name)
if (!tempTables.contains(table)) {
throw new NoSuchTempViewException(table)
}
CatalogTable(
identifier = TableIdentifier(table),
tableType = CatalogTableType.VIEW,
storage = CatalogStorageFormat.empty,
schema = tempTables(table).output.toStructType)
}

/**
* Retrieve the metadata of an existing permanent table/view. If no database is specified,
* assume the table/view is in the current database. If the specified table/view is not found
* in the database then a [[NoSuchTableException]] is thrown.
*/
def getNonTempTableMetadata(name: TableIdentifier): CatalogTable = {
def getTableMetadata(name: TableIdentifier): CatalogTable = {
val db = formatDatabaseName(name.database.getOrElse(getCurrentDatabase))
val table = formatTableName(name.table)
requireDbExists(db)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -444,7 +444,7 @@ class SessionCatalogSuite extends SparkFunSuite {
assert(!catalog.tableExists(TableIdentifier("view1", Some("default"))))
}

test("getTableMetadata on temporary views") {
test("getTableMetadata and getTempViewMetadata on temporary views") {
val catalog = new SessionCatalog(newBasicCatalog())
val tempTable = Range(1, 10, 2, 10)
val m = intercept[AnalysisException] {
Expand All @@ -457,9 +457,16 @@ class SessionCatalogSuite extends SparkFunSuite {
}.getMessage
assert(m2.contains("Table or view 'view1' not found in database 'default'"))

intercept[NoSuchTempViewException] {
catalog.getTempViewMetadata("view1")
}.getMessage

catalog.createTempView("view1", tempTable, overrideIfExists = false)
assert(catalog.getTableMetadata(TableIdentifier("view1")).identifier.table == "view1")
assert(catalog.getTableMetadata(TableIdentifier("view1")).schema(0).name == "id")
assert(catalog.getTempViewMetadata("view1").identifier === TableIdentifier("view1"))

intercept[NoSuchTableException] {
catalog.getTableMetadata(TableIdentifier("view1"))
}.getMessage

val m3 = intercept[AnalysisException] {
catalog.getTableMetadata(TableIdentifier("view1", Some("default")))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -265,7 +265,7 @@ case class AlterTableUnsetPropertiesCommand(
override def run(sparkSession: SparkSession): Seq[Row] = {
val catalog = sparkSession.sessionState.catalog
DDLUtils.verifyAlterTableType(catalog, tableName, isView)
val table = catalog.getNonTempTableMetadata(tableName)
val table = catalog.getTableMetadata(tableName)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same here, we can always pass in a table identifier with database


if (!ifExists) {
propKeys.foreach { k =>
Expand Down Expand Up @@ -305,7 +305,7 @@ case class AlterTableSerDePropertiesCommand(

override def run(sparkSession: SparkSession): Seq[Row] = {
val catalog = sparkSession.sessionState.catalog
val table = catalog.getNonTempTableMetadata(tableName)
val table = catalog.getTableMetadata(tableName)
// For datasource tables, disallow setting serde or specifying partition
if (partSpec.isDefined && DDLUtils.isDatasourceTable(table)) {
throw new AnalysisException("Operation not allowed: ALTER TABLE SET " +
Expand Down Expand Up @@ -354,7 +354,7 @@ case class AlterTableAddPartitionCommand(

override def run(sparkSession: SparkSession): Seq[Row] = {
val catalog = sparkSession.sessionState.catalog
val table = catalog.getNonTempTableMetadata(tableName)
val table = catalog.getTableMetadata(tableName)
if (DDLUtils.isDatasourceTable(table)) {
throw new AnalysisException(
"ALTER TABLE ADD PARTITION is not allowed for tables defined using the datasource API")
Expand Down Expand Up @@ -414,7 +414,7 @@ case class AlterTableDropPartitionCommand(

override def run(sparkSession: SparkSession): Seq[Row] = {
val catalog = sparkSession.sessionState.catalog
val table = catalog.getNonTempTableMetadata(tableName)
val table = catalog.getTableMetadata(tableName)
if (DDLUtils.isDatasourceTable(table)) {
throw new AnalysisException(
"ALTER TABLE DROP PARTITIONS is not allowed for tables defined using the datasource API")
Expand Down Expand Up @@ -468,7 +468,7 @@ case class AlterTableRecoverPartitionsCommand(

override def run(spark: SparkSession): Seq[Row] = {
val catalog = spark.sessionState.catalog
val table = catalog.getNonTempTableMetadata(tableName)
val table = catalog.getTableMetadata(tableName)
val qualifiedName = table.identifier.quotedString

if (DDLUtils.isDatasourceTable(table)) {
Expand Down Expand Up @@ -645,7 +645,7 @@ case class AlterTableSetLocationCommand(

override def run(sparkSession: SparkSession): Seq[Row] = {
val catalog = sparkSession.sessionState.catalog
val table = catalog.getNonTempTableMetadata(tableName)
val table = catalog.getTableMetadata(tableName)
partitionSpec match {
case Some(spec) =>
// Partition spec is specified, so we set the location only for this partition
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,11 @@ case class CreateTableLikeCommand(
s"Source table in CREATE TABLE LIKE does not exist: '$sourceTable'")
}

val sourceTableDesc = catalog.getTableMetadata(sourceTable)
val sourceTableDesc = if (catalog.isTemporaryTable(sourceTable)) {
catalog.getTempViewMetadata(sourceTable.table)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is not atomic, we should have a method that return temp view metadata or None.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, you are right. Let me fix it.

} else {
catalog.getTableMetadata(sourceTable)
}

// Storage format
val newStorage =
Expand Down Expand Up @@ -176,7 +180,11 @@ case class AlterTableRenameCommand(
}
}
// For datasource tables, we also need to update the "path" serde property
val table = catalog.getTableMetadata(oldName)
val table = if (catalog.isTemporaryTable(oldName)) {
catalog.getTempViewMetadata(oldName.table)
} else {
catalog.getTableMetadata(oldName)
}
if (DDLUtils.isDatasourceTable(table) && table.tableType == CatalogTableType.MANAGED) {
val newPath = catalog.defaultTablePath(newTblName)
val newTable = table.withNewStorage(
Expand Down Expand Up @@ -214,7 +222,7 @@ case class LoadDataCommand(

override def run(sparkSession: SparkSession): Seq[Row] = {
val catalog = sparkSession.sessionState.catalog
val targetTable = catalog.getNonTempTableMetadata(table)
val targetTable = catalog.getTableMetadata(table)
val qualifiedName = targetTable.identifier.quotedString

if (targetTable.tableType == CatalogTableType.VIEW) {
Expand Down Expand Up @@ -333,7 +341,7 @@ case class TruncateTableCommand(

override def run(spark: SparkSession): Seq[Row] = {
val catalog = spark.sessionState.catalog
val table = catalog.getNonTempTableMetadata(tableName)
val table = catalog.getTableMetadata(tableName)
val qualifiedName = table.identifier.quotedString

if (table.tableType == CatalogTableType.EXTERNAL) {
Expand Down Expand Up @@ -592,13 +600,19 @@ case class ShowTablePropertiesCommand(table: TableIdentifier, propertyKey: Optio
* SHOW COLUMNS (FROM | IN) table_identifier [(FROM | IN) database];
* }}}
*/
case class ShowColumnsCommand(table: TableIdentifier) extends RunnableCommand {
case class ShowColumnsCommand(tableName: TableIdentifier) extends RunnableCommand {
override val output: Seq[Attribute] = {
AttributeReference("col_name", StringType, nullable = false)() :: Nil
}

override def run(sparkSession: SparkSession): Seq[Row] = {
sparkSession.sessionState.catalog.getTableMetadata(table).schema.map { c =>
val catalog = sparkSession.sessionState.catalog
val table = if (catalog.isTemporaryTable(tableName)) {
catalog.getTempViewMetadata(tableName.table)
} else {
catalog.getTableMetadata(tableName)
}
table.schema.map { c =>
Row(c.name)
}
}
Expand Down Expand Up @@ -634,7 +648,7 @@ case class ShowPartitionsCommand(

override def run(sparkSession: SparkSession): Seq[Row] = {
val catalog = sparkSession.sessionState.catalog
val table = catalog.getNonTempTableMetadata(tableName)
val table = catalog.getTableMetadata(tableName)
val qualifiedName = table.identifier.quotedString

/**
Expand Down Expand Up @@ -686,9 +700,7 @@ case class ShowCreateTableCommand(table: TableIdentifier) extends RunnableComman

override def run(sparkSession: SparkSession): Seq[Row] = {
val catalog = sparkSession.sessionState.catalog
val db = table.database.getOrElse(catalog.getCurrentDatabase)
val qualifiedName = TableIdentifier(table.table, Some(db))
val tableMetadata = catalog.getTableMetadata(qualifiedName)
val tableMetadata = catalog.getTableMetadata(table)

// TODO: unify this after we unify the CREATE TABLE syntax for hive serde and data source table.
val stmt = if (DDLUtils.isDatasourceTable(tableMetadata)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,12 @@ class CatalogImpl(sparkSession: SparkSession) extends Catalog {
}

private def listColumns(tableIdentifier: TableIdentifier): Dataset[Column] = {
val tableMetadata = sessionCatalog.getTableMetadata(tableIdentifier)
val tableMetadata = if (sessionCatalog.isTemporaryTable(tableIdentifier)) {
sessionCatalog.getTempViewMetadata(tableIdentifier.table)
} else {
sessionCatalog.getTableMetadata(tableIdentifier)
}

val partitionColumnNames = tableMetadata.partitionColumnNames.toSet
val bucketColumnNames = tableMetadata.bucketSpec.map(_.bucketColumnNames).getOrElse(Nil).toSet
val columns = tableMetadata.schema.map { c =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -671,8 +671,7 @@ class HiveDDLSuite
.createTempView(sourceViewName)
sql(s"CREATE TABLE $targetTabName LIKE $sourceViewName")

val sourceTable = spark.sessionState.catalog.getTableMetadata(
TableIdentifier(sourceViewName, None))
val sourceTable = spark.sessionState.catalog.getTempViewMetadata(sourceViewName)
val targetTable = spark.sessionState.catalog.getTableMetadata(
TableIdentifier(targetTabName, Some("default")))

Expand Down