-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-17502] [SQL] Fix Multiple Bugs in DDL Statements on Temporary Views #15054
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 8 commits
cc47c3e
855df61
61749b7
60371d8
6029a95
7c1a8e5
a01f6b3
333497a
5e40880
f305c4c
c662d2c
48ce44e
ee3096c
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -246,27 +246,32 @@ class SessionCatalog( | |
| } | ||
|
|
||
| /** | ||
| * Retrieve the metadata of an existing metastore table. | ||
| * If no database is specified, assume the table is in the current database. | ||
| * If the specified table is not found in the database then a [[NoSuchTableException]] is thrown. | ||
| * Retrieve the metadata of an existing temporary view. | ||
| * If the temporary view does not exist, a [[NoSuchTempViewException]] is thrown. | ||
| */ | ||
| def getTempViewMetadata(name: String): CatalogTable = { | ||
|
||
| val table = formatTableName(name) | ||
| if (!tempTables.contains(table)) { | ||
| throw new NoSuchTempViewException(table) | ||
| } | ||
| CatalogTable( | ||
| identifier = TableIdentifier(table), | ||
| tableType = CatalogTableType.VIEW, | ||
| storage = CatalogStorageFormat.empty, | ||
| schema = tempTables(table).output.toStructType) | ||
| } | ||
|
|
||
| /** | ||
| * Retrieve the metadata of an existing permanent table/view. If no database is specified, | ||
| * assume the table/view is in the current database. If the specified table/view is not found | ||
| * in the database then a [[NoSuchTableException]] is thrown. | ||
| */ | ||
| def getTableMetadata(name: TableIdentifier): CatalogTable = { | ||
| val db = formatDatabaseName(name.database.getOrElse(getCurrentDatabase)) | ||
| val table = formatTableName(name.table) | ||
| val tid = TableIdentifier(table) | ||
| if (isTemporaryTable(name)) { | ||
| CatalogTable( | ||
| identifier = tid, | ||
| tableType = CatalogTableType.VIEW, | ||
| storage = CatalogStorageFormat.empty, | ||
| schema = tempTables(table).output.toStructType, | ||
| properties = Map(), | ||
| viewText = None) | ||
| } else { | ||
| requireDbExists(db) | ||
| requireTableExists(TableIdentifier(table, Some(db))) | ||
| externalCatalog.getTable(db, table) | ||
| } | ||
| requireDbExists(db) | ||
| requireTableExists(TableIdentifier(table, Some(db))) | ||
| externalCatalog.getTable(db, table) | ||
| } | ||
|
|
||
| /** | ||
|
|
@@ -520,11 +525,11 @@ class SessionCatalog( | |
| tableName: TableIdentifier, | ||
| parts: Seq[CatalogTablePartition], | ||
| ignoreIfExists: Boolean): Unit = { | ||
| requireExactMatchedPartitionSpec(parts.map(_.spec), getTableMetadata(tableName)) | ||
| val db = formatDatabaseName(tableName.database.getOrElse(getCurrentDatabase)) | ||
| val table = formatTableName(tableName.table) | ||
| requireDbExists(db) | ||
| requireTableExists(TableIdentifier(table, Option(db))) | ||
| requireExactMatchedPartitionSpec(parts.map(_.spec), getTableMetadata(tableName)) | ||
| externalCatalog.createPartitions(db, table, parts, ignoreIfExists) | ||
| } | ||
|
|
||
|
|
@@ -537,11 +542,11 @@ class SessionCatalog( | |
| specs: Seq[TablePartitionSpec], | ||
| ignoreIfNotExists: Boolean, | ||
| purge: Boolean): Unit = { | ||
| requirePartialMatchedPartitionSpec(specs, getTableMetadata(tableName)) | ||
| val db = formatDatabaseName(tableName.database.getOrElse(getCurrentDatabase)) | ||
| val table = formatTableName(tableName.table) | ||
| requireDbExists(db) | ||
| requireTableExists(TableIdentifier(table, Option(db))) | ||
| requirePartialMatchedPartitionSpec(specs, getTableMetadata(tableName)) | ||
| externalCatalog.dropPartitions(db, table, specs, ignoreIfNotExists, purge) | ||
| } | ||
|
|
||
|
|
@@ -556,12 +561,12 @@ class SessionCatalog( | |
| specs: Seq[TablePartitionSpec], | ||
| newSpecs: Seq[TablePartitionSpec]): Unit = { | ||
| val tableMetadata = getTableMetadata(tableName) | ||
| requireExactMatchedPartitionSpec(specs, tableMetadata) | ||
| requireExactMatchedPartitionSpec(newSpecs, tableMetadata) | ||
| val db = formatDatabaseName(tableName.database.getOrElse(getCurrentDatabase)) | ||
| val table = formatTableName(tableName.table) | ||
| requireDbExists(db) | ||
| requireTableExists(TableIdentifier(table, Option(db))) | ||
| requireExactMatchedPartitionSpec(specs, tableMetadata) | ||
| requireExactMatchedPartitionSpec(newSpecs, tableMetadata) | ||
| externalCatalog.renamePartitions(db, table, specs, newSpecs) | ||
| } | ||
|
|
||
|
|
@@ -575,11 +580,11 @@ class SessionCatalog( | |
| * this becomes a no-op. | ||
| */ | ||
| def alterPartitions(tableName: TableIdentifier, parts: Seq[CatalogTablePartition]): Unit = { | ||
| requireExactMatchedPartitionSpec(parts.map(_.spec), getTableMetadata(tableName)) | ||
| val db = formatDatabaseName(tableName.database.getOrElse(getCurrentDatabase)) | ||
| val table = formatTableName(tableName.table) | ||
| requireDbExists(db) | ||
| requireTableExists(TableIdentifier(table, Option(db))) | ||
| requireExactMatchedPartitionSpec(parts.map(_.spec), getTableMetadata(tableName)) | ||
| externalCatalog.alterPartitions(db, table, parts) | ||
| } | ||
|
|
||
|
|
@@ -588,11 +593,11 @@ class SessionCatalog( | |
| * If no database is specified, assume the table is in the current database. | ||
| */ | ||
| def getPartition(tableName: TableIdentifier, spec: TablePartitionSpec): CatalogTablePartition = { | ||
| requireExactMatchedPartitionSpec(Seq(spec), getTableMetadata(tableName)) | ||
| val db = formatDatabaseName(tableName.database.getOrElse(getCurrentDatabase)) | ||
| val table = formatTableName(tableName.table) | ||
| requireDbExists(db) | ||
| requireTableExists(TableIdentifier(table, Option(db))) | ||
| requireExactMatchedPartitionSpec(Seq(spec), getTableMetadata(tableName)) | ||
| externalCatalog.getPartition(db, table, spec) | ||
| } | ||
|
|
||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -22,6 +22,7 @@ import scala.util.control.NonFatal | |
| import org.apache.hadoop.fs.{FileSystem, Path} | ||
|
|
||
| import org.apache.spark.sql.{AnalysisException, Dataset, Row, SparkSession} | ||
| import org.apache.spark.sql.catalyst.TableIdentifier | ||
| import org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases | ||
| import org.apache.spark.sql.catalyst.catalog.{CatalogRelation, CatalogTable} | ||
| import org.apache.spark.sql.catalyst.plans.logical.Statistics | ||
|
|
@@ -37,7 +38,9 @@ case class AnalyzeTableCommand(tableName: String, noscan: Boolean = true) extend | |
| override def run(sparkSession: SparkSession): Seq[Row] = { | ||
| val sessionState = sparkSession.sessionState | ||
| val tableIdent = sessionState.sqlParser.parseTableIdentifier(tableName) | ||
| val relation = EliminateSubqueryAliases(sessionState.catalog.lookupRelation(tableIdent)) | ||
| val db = tableIdent.database.getOrElse(sessionState.catalog.getCurrentDatabase) | ||
| val qualifiedName = TableIdentifier(tableIdent.table, Some(db)) | ||
|
||
| val relation = EliminateSubqueryAliases(sessionState.catalog.lookupRelation(qualifiedName)) | ||
|
|
||
| relation match { | ||
| case relation: CatalogRelation => | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
cc @yhuai do you remember why we have this check?
InsertIntoTablecan only be used for table right? When will we hit this branch?Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
spark.sql( s"""CREATE TEMPORARY VIEW normal_orc_source |USING org.apache.spark.sql.hive.orc |OPTIONS ( | PATH '${new File(orcTableAsDir.getAbsolutePath).getCanonicalPath}' |) """.stripMargin) sql("INSERT INTO TABLE normal_orc_source SELECT * FROM orc_temp_table WHERE intField > 5")We allow users to insert rows into temporary views that is created using
CREATE TEMPORARY VIEW.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If the temporary view is created by
createOrReplaceTempView, users are unable to insert rows into the temporary view.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can insert data into a temp view? Is it by design? cc @rxin @marmbrus @yhuai
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess this might be designed for JDBC data sources at the beginning. Just a guess.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This probably dates back to when we called these temp tables and when we didn't have persistent data source tables. Given that I think we probably have to support this for compatibility. I'm not sure why this isn't a whitelist though?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess the reason is
LogicalRelationis not in the package ofsql\catalyst. Otherwise, it should be like