-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-19024][SQL] Implement new approach to write a permanent view #16613
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 1 commit
917ca04
9d582a4
2d49ef2
e2ccdd5
7c5b6af
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
- Loading branch information
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -165,7 +165,7 @@ case class CreateViewCommand( | |
| throw new AnalysisException(s"$name is not a view") | ||
| } else if (replace) { | ||
| // Handles `CREATE OR REPLACE VIEW v0 AS SELECT ...` | ||
| catalog.alterTable(prepareTable(sparkSession, aliasedPlan)) | ||
| catalog.alterTable(prepareTable(sparkSession, aliasedPlan, analyzedPlan)) | ||
| } else { | ||
| // Handles `CREATE VIEW v0 AS SELECT ...`. Throws exception when the target view already | ||
| // exists. | ||
|
|
@@ -175,7 +175,8 @@ case class CreateViewCommand( | |
| } | ||
| } else { | ||
| // Create the view if it doesn't exist. | ||
| catalog.createTable(prepareTable(sparkSession, aliasedPlan), ignoreIfExists = false) | ||
| catalog.createTable(prepareTable(sparkSession, aliasedPlan, analyzedPlan), | ||
| ignoreIfExists = false) | ||
| } | ||
| Seq.empty[Row] | ||
| } | ||
|
|
@@ -212,14 +213,23 @@ case class CreateViewCommand( | |
| * Returns a [[CatalogTable]] that can be used to save in the catalog. Generate the view-specific | ||
| * properties(e.g. view default database, view query output column names) and store them as | ||
| * properties in the CatalogTable, and also creates the proper schema for the view. | ||
| * | ||
| * @param session the spark session. | ||
| * @param aliasedPlan if `userSpecifiedColumns` is defined, the aliased plan outputs the user | ||
| * specified columns, else it is the same as the `analyzedPlan`. | ||
| * @param analyzedPlan the analyzed logical plan that represents the child of a view. | ||
|
||
| * @return a CatalogTable that can be used to save in the catalog. | ||
| */ | ||
| private def prepareTable(session: SparkSession, aliasedPlan: LogicalPlan): CatalogTable = { | ||
| private def prepareTable( | ||
| session: SparkSession, | ||
| aliasedPlan: LogicalPlan, | ||
| analyzedPlan: LogicalPlan): CatalogTable = { | ||
| if (originalText.isEmpty) { | ||
| throw new AnalysisException( | ||
| "It is not allowed to create a persisted view from the Dataset API") | ||
| } | ||
|
|
||
| val newProperties = generateViewProperties(properties, session, originalText.get) | ||
| val newProperties = generateViewProperties(properties, session, analyzedPlan) | ||
|
|
||
| CatalogTable( | ||
| identifier = name, | ||
|
|
@@ -276,7 +286,7 @@ case class AlterViewAsCommand( | |
| throw new AnalysisException(s"${viewMeta.identifier} is not a view.") | ||
| } | ||
|
|
||
| val newProperties = generateViewProperties(viewMeta.properties, session, originalText) | ||
| val newProperties = generateViewProperties(viewMeta.properties, session, analyzedPlan) | ||
|
|
||
| val updatedViewMeta = viewMeta.copy( | ||
| schema = analyzedPlan.schema, | ||
|
|
@@ -332,30 +342,17 @@ object ViewHelper { | |
| * | ||
| * @param properties the `properties` in CatalogTable. | ||
| * @param session the spark session. | ||
| * @param viewText the query string used to create the child plan on view resolution. | ||
| * @param analyzedPlan the analyzed logical plan that represents the child of a view. | ||
| * @return new view properties including view default database and query column names properties. | ||
| */ | ||
| def generateViewProperties( | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. looks like all other methods in this class can be
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. yea, will update that. |
||
| properties: Map[String, String], | ||
| session: SparkSession, | ||
| viewText: String): Map[String, String] = { | ||
| // Try to analyze the viewText, throw an AnalysisException if the query is invalid. | ||
| val queryPlan = try { | ||
| val unresolvedPlan = session.sessionState.sqlParser.parsePlan(viewText) | ||
| val resolvedPlan = session.sessionState.analyzer.execute(unresolvedPlan) | ||
| session.sessionState.analyzer.checkAnalysis(resolvedPlan) | ||
|
|
||
| resolvedPlan | ||
| } catch { | ||
| case e: AnalysisException => | ||
| throw new AnalysisException(s"Failed to analyze the view query SQL: $viewText", | ||
| cause = Some(e)) | ||
| } | ||
|
|
||
| analyzedPlan: LogicalPlan): Map[String, String] = { | ||
| // Generate the query column names, throw an AnalysisException if there exists duplicate column | ||
| // names. | ||
| val queryOutput = queryPlan.schema.fieldNames | ||
| assert(queryOutput.toSet.size == queryOutput.size, | ||
| val queryOutput = analyzedPlan.schema.fieldNames | ||
| assert(queryOutput.distinct.size == queryOutput.size, | ||
| s"The view output ${queryOutput.mkString("(", ",", ")")} contains duplicate column name.") | ||
|
|
||
| // Generate the view default database name. | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: put them in one line