Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -360,6 +360,7 @@ trait CheckAnalysis extends PredicateHelper {

case InsertIntoTable(t, _, _, _, _)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cc @yhuai do you remember why we have this check? InsertIntoTable can only be used for table right? When will we hit this branch?

Copy link
Member Author

@gatorsmile gatorsmile Sep 15, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

    spark.sql(
      s"""CREATE TEMPORARY VIEW normal_orc_source
         |USING org.apache.spark.sql.hive.orc
         |OPTIONS (
         |  PATH '${new File(orcTableAsDir.getAbsolutePath).getCanonicalPath}'
         |)
       """.stripMargin)
    sql("INSERT INTO TABLE normal_orc_source SELECT * FROM orc_temp_table WHERE intField > 5")

We allow users to insert rows into temporary views that is created using CREATE TEMPORARY VIEW.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the temporary view is created by createOrReplaceTempView, users are unable to insert rows into the temporary view.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can insert data into a temp view? Is it by design? cc @rxin @marmbrus @yhuai

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess this might be designed for JDBC data sources at the beginning. Just a guess.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This probably dates back to when we called these temp tables and when we didn't have persistent data source tables. Given that I think we probably have to support this for compatibility. I'm not sure why this isn't a whitelist though?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess the reason is LogicalRelation is not in the package of sql\catalyst. Otherwise, it should be like

          case InsertIntoTable(t, _, _, _, _)
            if !t.isInstanceOf[SimpleCatalogRelation] && !t.isInstanceOf[LogicalRelation]

if !t.isInstanceOf[LeafNode] ||
t.isInstanceOf[Range] ||
t == OneRowRelation ||
t.isInstanceOf[LocalRelation] =>
failAnalysis(s"Inserting into an RDD-based table is not allowed.")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -246,9 +246,10 @@ class SessionCatalog(
}

/**
* Retrieve the metadata of an existing metastore table.
* If no database is specified, assume the table is in the current database.
* If the specified table is not found in the database then a [[NoSuchTableException]] is thrown.
* Retrieve the metadata of an existing metastore table/view or a temporary view.
* If no database is specified, we check whether the corresponding temporary view exists.
* If the temporary view does not exist, we assume the table/view is in the current database.
* If still not found in the database then a [[NoSuchTableException]] is thrown.
*/
def getTableMetadata(name: TableIdentifier): CatalogTable = {
val db = formatDatabaseName(name.database.getOrElse(getCurrentDatabase))
Expand All @@ -269,6 +270,19 @@ class SessionCatalog(
}
}

/**
* Retrieve the metadata of an existing permanent table/view. If no database is specified,
* assume the table/view is in the current database. If the specified table/view is not found
* in the database then a [[NoSuchTableException]] is thrown.
*/
def getNonTempTableMetadata(name: TableIdentifier): CatalogTable = {
Copy link
Member Author

@gatorsmile gatorsmile Sep 14, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan In your PR #14962, getTableMetadata will not deal with temporary table. To simplify the potential conflict resolution, I just introduce a new function getNonTempTableMetadata. Thus, you can just basically replace it by the new getTableMetadata. Does it sound OK to you? Sorry for any inconvenience it might cause.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think getTableMetadata should handle temp view, maybe we can fix it in this PR?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, will do it soon.

val db = formatDatabaseName(name.database.getOrElse(getCurrentDatabase))
val table = formatTableName(name.table)
requireDbExists(db)
requireTableExists(TableIdentifier(table, Some(db)))
externalCatalog.getTable(db, table)
}

/**
* Retrieve the metadata of an existing metastore table.
* If no database is specified, assume the table is in the current database.
Expand Down Expand Up @@ -520,11 +534,11 @@ class SessionCatalog(
tableName: TableIdentifier,
parts: Seq[CatalogTablePartition],
ignoreIfExists: Boolean): Unit = {
requireExactMatchedPartitionSpec(parts.map(_.spec), getTableMetadata(tableName))
val db = formatDatabaseName(tableName.database.getOrElse(getCurrentDatabase))
val table = formatTableName(tableName.table)
requireDbExists(db)
requireTableExists(TableIdentifier(table, Option(db)))
requireExactMatchedPartitionSpec(parts.map(_.spec), getTableMetadata(tableName))
externalCatalog.createPartitions(db, table, parts, ignoreIfExists)
}

Expand All @@ -537,11 +551,11 @@ class SessionCatalog(
specs: Seq[TablePartitionSpec],
ignoreIfNotExists: Boolean,
purge: Boolean): Unit = {
requirePartialMatchedPartitionSpec(specs, getTableMetadata(tableName))
val db = formatDatabaseName(tableName.database.getOrElse(getCurrentDatabase))
val table = formatTableName(tableName.table)
requireDbExists(db)
requireTableExists(TableIdentifier(table, Option(db)))
requirePartialMatchedPartitionSpec(specs, getTableMetadata(tableName))
externalCatalog.dropPartitions(db, table, specs, ignoreIfNotExists, purge)
}

Expand All @@ -556,12 +570,12 @@ class SessionCatalog(
specs: Seq[TablePartitionSpec],
newSpecs: Seq[TablePartitionSpec]): Unit = {
val tableMetadata = getTableMetadata(tableName)
requireExactMatchedPartitionSpec(specs, tableMetadata)
requireExactMatchedPartitionSpec(newSpecs, tableMetadata)
val db = formatDatabaseName(tableName.database.getOrElse(getCurrentDatabase))
val table = formatTableName(tableName.table)
requireDbExists(db)
requireTableExists(TableIdentifier(table, Option(db)))
requireExactMatchedPartitionSpec(specs, tableMetadata)
requireExactMatchedPartitionSpec(newSpecs, tableMetadata)
externalCatalog.renamePartitions(db, table, specs, newSpecs)
}

Expand All @@ -575,11 +589,11 @@ class SessionCatalog(
* this becomes a no-op.
*/
def alterPartitions(tableName: TableIdentifier, parts: Seq[CatalogTablePartition]): Unit = {
requireExactMatchedPartitionSpec(parts.map(_.spec), getTableMetadata(tableName))
val db = formatDatabaseName(tableName.database.getOrElse(getCurrentDatabase))
val table = formatTableName(tableName.table)
requireDbExists(db)
requireTableExists(TableIdentifier(table, Option(db)))
requireExactMatchedPartitionSpec(parts.map(_.spec), getTableMetadata(tableName))
externalCatalog.alterPartitions(db, table, parts)
}

Expand All @@ -588,11 +602,11 @@ class SessionCatalog(
* If no database is specified, assume the table is in the current database.
*/
def getPartition(tableName: TableIdentifier, spec: TablePartitionSpec): CatalogTablePartition = {
requireExactMatchedPartitionSpec(Seq(spec), getTableMetadata(tableName))
val db = formatDatabaseName(tableName.database.getOrElse(getCurrentDatabase))
val table = formatTableName(tableName.table)
requireDbExists(db)
requireTableExists(TableIdentifier(table, Option(db)))
requireExactMatchedPartitionSpec(Seq(spec), getTableMetadata(tableName))
externalCatalog.getPartition(db, table, spec)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import scala.util.control.NonFatal
import org.apache.hadoop.fs.{FileSystem, Path}

import org.apache.spark.sql.{AnalysisException, Dataset, Row, SparkSession}
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases
import org.apache.spark.sql.catalyst.catalog.{CatalogRelation, CatalogTable}
import org.apache.spark.sql.catalyst.plans.logical.Statistics
Expand All @@ -37,7 +38,9 @@ case class AnalyzeTableCommand(tableName: String, noscan: Boolean = true) extend
override def run(sparkSession: SparkSession): Seq[Row] = {
val sessionState = sparkSession.sessionState
val tableIdent = sessionState.sqlParser.parseTableIdentifier(tableName)
val relation = EliminateSubqueryAliases(sessionState.catalog.lookupRelation(tableIdent))
val db = tableIdent.database.getOrElse(sessionState.catalog.getCurrentDatabase)
val qualifiedName = TableIdentifier(tableIdent.table, Some(db))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's follow the existing convention and call it tableIdentWithDB

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done. Clean all the related naming issues.

val relation = EliminateSubqueryAliases(sessionState.catalog.lookupRelation(qualifiedName))

relation match {
case relation: CatalogRelation =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -265,12 +265,13 @@ case class AlterTableUnsetPropertiesCommand(
override def run(sparkSession: SparkSession): Seq[Row] = {
val catalog = sparkSession.sessionState.catalog
DDLUtils.verifyAlterTableType(catalog, tableName, isView)
val table = catalog.getTableMetadata(tableName)
val table = catalog.getNonTempTableMetadata(tableName)

if (!ifExists) {
propKeys.foreach { k =>
if (!table.properties.contains(k)) {
throw new AnalysisException(
s"Attempted to unset non-existent property '$k' in table '$tableName'")
s"Attempted to unset non-existent property '$k' in table '${table.identifier}'")
}
}
}
Expand Down Expand Up @@ -304,7 +305,7 @@ case class AlterTableSerDePropertiesCommand(

override def run(sparkSession: SparkSession): Seq[Row] = {
val catalog = sparkSession.sessionState.catalog
val table = catalog.getTableMetadata(tableName)
val table = catalog.getNonTempTableMetadata(tableName)
// For datasource tables, disallow setting serde or specifying partition
if (partSpec.isDefined && DDLUtils.isDatasourceTable(table)) {
throw new AnalysisException("Operation not allowed: ALTER TABLE SET " +
Expand All @@ -322,11 +323,11 @@ case class AlterTableSerDePropertiesCommand(
catalog.alterTable(newTable)
} else {
val spec = partSpec.get
val part = catalog.getPartition(tableName, spec)
val part = catalog.getPartition(table.identifier, spec)
val newPart = part.copy(storage = part.storage.copy(
serde = serdeClassName.orElse(part.storage.serde),
properties = part.storage.properties ++ serdeProperties.getOrElse(Map())))
catalog.alterPartitions(tableName, Seq(newPart))
catalog.alterPartitions(table.identifier, Seq(newPart))
}
Seq.empty[Row]
}
Expand All @@ -353,7 +354,7 @@ case class AlterTableAddPartitionCommand(

override def run(sparkSession: SparkSession): Seq[Row] = {
val catalog = sparkSession.sessionState.catalog
val table = catalog.getTableMetadata(tableName)
val table = catalog.getNonTempTableMetadata(tableName)
if (DDLUtils.isDatasourceTable(table)) {
throw new AnalysisException(
"ALTER TABLE ADD PARTITION is not allowed for tables defined using the datasource API")
Expand All @@ -362,7 +363,7 @@ case class AlterTableAddPartitionCommand(
// inherit table storage format (possibly except for location)
CatalogTablePartition(spec, table.storage.copy(locationUri = location))
}
catalog.createPartitions(tableName, parts, ignoreIfExists = ifNotExists)
catalog.createPartitions(table.identifier, parts, ignoreIfExists = ifNotExists)
Seq.empty[Row]
}

Expand Down Expand Up @@ -413,12 +414,12 @@ case class AlterTableDropPartitionCommand(

override def run(sparkSession: SparkSession): Seq[Row] = {
val catalog = sparkSession.sessionState.catalog
val table = catalog.getTableMetadata(tableName)
val table = catalog.getNonTempTableMetadata(tableName)
if (DDLUtils.isDatasourceTable(table)) {
throw new AnalysisException(
"ALTER TABLE DROP PARTITIONS is not allowed for tables defined using the datasource API")
}
catalog.dropPartitions(tableName, specs, ignoreIfNotExists = ifExists, purge = purge)
catalog.dropPartitions(table.identifier, specs, ignoreIfNotExists = ifExists, purge = purge)
Seq.empty[Row]
}

Expand Down Expand Up @@ -467,25 +468,20 @@ case class AlterTableRecoverPartitionsCommand(

override def run(spark: SparkSession): Seq[Row] = {
val catalog = spark.sessionState.catalog
if (!catalog.tableExists(tableName)) {
throw new AnalysisException(s"Table $tableName in $cmd does not exist.")
}
if (catalog.isTemporaryTable(tableName)) {
throw new AnalysisException(
s"Operation not allowed: $cmd on temporary tables: $tableName")
}
val table = catalog.getTableMetadata(tableName)
val table = catalog.getNonTempTableMetadata(tableName)
val qualifiedName = table.identifier.quotedString

if (DDLUtils.isDatasourceTable(table)) {
throw new AnalysisException(
s"Operation not allowed: $cmd on datasource tables: $tableName")
s"Operation not allowed: $cmd on datasource tables: $qualifiedName")
}
if (table.partitionColumnNames.isEmpty) {
throw new AnalysisException(
s"Operation not allowed: $cmd only works on partitioned tables: $tableName")
s"Operation not allowed: $cmd only works on partitioned tables: $qualifiedName")
}
if (table.storage.locationUri.isEmpty) {
throw new AnalysisException(
s"Operation not allowed: $cmd only works on table with location provided: $tableName")
s"Operation not allowed: $cmd only works on table with location provided: $qualifiedName")
}

val root = new Path(table.storage.locationUri.get)
Expand Down Expand Up @@ -649,11 +645,11 @@ case class AlterTableSetLocationCommand(

override def run(sparkSession: SparkSession): Seq[Row] = {
val catalog = sparkSession.sessionState.catalog
val table = catalog.getTableMetadata(tableName)
val table = catalog.getNonTempTableMetadata(tableName)
partitionSpec match {
case Some(spec) =>
// Partition spec is specified, so we set the location only for this partition
val part = catalog.getPartition(tableName, spec)
val part = catalog.getPartition(table.identifier, spec)
val newPart =
if (DDLUtils.isDatasourceTable(table)) {
throw new AnalysisException(
Expand All @@ -662,7 +658,7 @@ case class AlterTableSetLocationCommand(
} else {
part.copy(storage = part.storage.copy(locationUri = Some(location)))
}
catalog.alterPartitions(tableName, Seq(newPart))
catalog.alterPartitions(table.identifier, Seq(newPart))
case None =>
// No partition spec is specified, so we set the location for the table itself
val newTable =
Expand Down
Loading