Skip to content
Closed
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
refactor aliasedPlan.
  • Loading branch information
jiangxb1987 committed Jan 18, 2017
commit e2ccdd5689274e44289017822a2bf81de32dbd37
Original file line number Diff line number Diff line change
Expand Up @@ -139,22 +139,12 @@ case class CreateViewCommand(
// This should be called after `qe.assertAnalyzed()` (i.e., `child` can be resolved)
verifyTemporaryObjectsNotExists(sparkSession)

val aliasedPlan = if (userSpecifiedColumns.isEmpty) {
analyzedPlan
} else {
val projectList = analyzedPlan.output.zip(userSpecifiedColumns).map {
case (attr, (colName, None)) => Alias(attr, colName)()
case (attr, (colName, Some(colComment))) =>
val meta = new MetadataBuilder().putString("comment", colComment).build()
Alias(attr, colName)(explicitMetadata = Some(meta))
}
sparkSession.sessionState.executePlan(Project(projectList, analyzedPlan)).analyzed
}

val catalog = sparkSession.sessionState.catalog
if (viewType == LocalTempView) {
val aliasedPlan = aliasPlan(sparkSession, analyzedPlan)
catalog.createTempView(name.table, aliasedPlan, overrideIfExists = replace)
} else if (viewType == GlobalTempView) {
val aliasedPlan = aliasPlan(sparkSession, analyzedPlan)
catalog.createGlobalTempView(name.table, aliasedPlan, overrideIfExists = replace)
} else if (catalog.tableExists(name)) {
val tableMetadata = catalog.getTableMetadata(name)
Expand All @@ -165,7 +155,7 @@ case class CreateViewCommand(
throw new AnalysisException(s"$name is not a view")
} else if (replace) {
// Handles `CREATE OR REPLACE VIEW v0 AS SELECT ...`
catalog.alterTable(prepareTable(sparkSession, aliasedPlan, analyzedPlan))
catalog.alterTable(prepareTable(sparkSession, analyzedPlan))
} else {
// Handles `CREATE VIEW v0 AS SELECT ...`. Throws exception when the target view already
// exists.
Expand All @@ -175,7 +165,7 @@ case class CreateViewCommand(
}
} else {
// Create the view if it doesn't exist.
catalog.createTable(prepareTable(sparkSession, aliasedPlan, analyzedPlan),
catalog.createTable(prepareTable(sparkSession, analyzedPlan),
ignoreIfExists = false)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: put them in one line

}
Seq.empty[Row]
Expand Down Expand Up @@ -209,26 +199,36 @@ case class CreateViewCommand(
}
}

/**
* If `userSpecifiedColumns` is defined, alias the analyzed plan to the user specified columns,
* else return the analyzed plan directly.
*/
private def aliasPlan(session: SparkSession, analyzedPlan: LogicalPlan): LogicalPlan = {
if (userSpecifiedColumns.isEmpty) {
analyzedPlan
} else {
val projectList = analyzedPlan.output.zip(userSpecifiedColumns).map {
case (attr, (colName, None)) => Alias(attr, colName)()
case (attr, (colName, Some(colComment))) =>
val meta = new MetadataBuilder().putString("comment", colComment).build()
Alias(attr, colName)(explicitMetadata = Some(meta))
}
session.sessionState.executePlan(Project(projectList, analyzedPlan)).analyzed
}
}

/**
* Returns a [[CatalogTable]] that can be used to save in the catalog. Generate the view-specific
* properties(e.g. view default database, view query output column names) and store them as
* properties in the CatalogTable, and also creates the proper schema for the view.
*
* @param session the spark session.
* @param aliasedPlan if `userSpecifiedColumns` is defined, the aliased plan outputs the user
* specified columns, else it is the same as the `analyzedPlan`.
* @param analyzedPlan the analyzed logical plan that represents the child of a view.
* @return a CatalogTable that can be used to save in the catalog.
*/
private def prepareTable(
session: SparkSession,
aliasedPlan: LogicalPlan,
analyzedPlan: LogicalPlan): CatalogTable = {
private def prepareTable(session: SparkSession, analyzedPlan: LogicalPlan): CatalogTable = {
if (originalText.isEmpty) {
throw new AnalysisException(
"It is not allowed to create a persisted view from the Dataset API")
}

val aliasedPlan = aliasPlan(session, analyzedPlan)
val newProperties = generateViewProperties(properties, session, analyzedPlan)

CatalogTable(
Expand Down Expand Up @@ -305,14 +305,14 @@ object ViewHelper {
/**
* Generate the view default database in `properties`.
*/
def generateViewDefaultDatabase(databaseName: String): Map[String, String] = {
private def generateViewDefaultDatabase(databaseName: String): Map[String, String] = {
Map(VIEW_DEFAULT_DATABASE -> databaseName)
}

/**
* Generate the view query output column names in `properties`.
*/
def generateQueryColumnNames(columns: Seq[String]): Map[String, String] = {
private def generateQueryColumnNames(columns: Seq[String]): Map[String, String] = {
val props = new mutable.HashMap[String, String]
if (columns.nonEmpty) {
props.put(VIEW_QUERY_OUTPUT_NUM_COLUMNS, columns.length.toString)
Expand All @@ -326,7 +326,7 @@ object ViewHelper {
/**
* Remove the view query output column names in `properties`.
*/
def removeQueryColumnNames(properties: Map[String, String]): Map[String, String] = {
private def removeQueryColumnNames(properties: Map[String, String]): Map[String, String] = {
// We can't use `filterKeys` here, as the map returned by `filterKeys` is not serializable,
// while `CatalogTable` should be serializable.
properties.filterNot { case (key, _) =>
Expand Down