Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
1e647ee
Share code to check column name duplication
maropu Apr 25, 2017
4467077
Apply reviews
maropu Jun 13, 2017
33ab217
Make code more consistent
maropu Jun 13, 2017
d8efb9d
Apply review comments
maropu Jun 16, 2017
11d1818
Apply xiao's reviews
maropu Jun 16, 2017
22e1e4f
Apply more xiao's reviews
maropu Jun 17, 2017
743a069
Replace map with foreach
maropu Jun 20, 2017
f6eab2d
Add tests for data schema + parititon schema
maropu Jun 20, 2017
09da8d6
Drop name dplication checks in HiveMetastoreCatalog.scala
maropu Jun 20, 2017
6d03f31
Modify exception messages
maropu Jun 20, 2017
a0b9b05
Revert logic to check name duplication
maropu Jun 20, 2017
91b6424
Add tests for write paths
maropu Jun 21, 2017
37ad3f3
Add tests for stream sink paths
maropu Jun 21, 2017
d0d9d3e
Burhs up code and adds more tests
maropu Jun 25, 2017
cbe9c71
Apply reviews
maropu Jun 26, 2017
c69270f
Apply more comments
maropu Jun 27, 2017
af959f6
Add more tests in create.sql
maropu Jun 27, 2017
8d3e10a
Move duplication checks in constructor
maropu Jun 29, 2017
9b386d5
Brush up code
maropu Jun 30, 2017
a878510
[WIP] Add DataSourceValidator trait to validate schema in write path
maropu Jul 3, 2017
be20127
Revert "Brush up code"
maropu Jul 3, 2017
f41bf80
Fix more issues
maropu Jul 4, 2017
0526391
Revert DataSourceValidator
maropu Jul 4, 2017
9e199bc
Add the check for external relation providers
maropu Jul 4, 2017
1ae132d
[WIP] Handle DataSource name duplication in one place
maropu Jul 5, 2017
5c29a75
Fix more
maropu Jul 6, 2017
5ed2c0d
Move some tests to DDLSuite
maropu Jul 7, 2017
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Brush up code
  • Loading branch information
maropu committed Jul 6, 2017
commit 9b386d56fd2ee12278bc2cfc4166a4eef0b9919c
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SubqueryAlias,
import org.apache.spark.sql.catalyst.util.StringUtils
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.util.SchemaUtils

object SessionCatalog {
val DEFAULT_DATABASE = "default"
Expand Down Expand Up @@ -342,10 +341,6 @@ class SessionCatalog(
requireDbExists(db)
requireTableExists(tableIdentifier)

SchemaUtils.checkSchemaColumnNameDuplication(
newSchema, "in the table definition of " + tableIdentifier.identifier,
conf.caseSensitiveAnalysis)

val catalogTable = externalCatalog.getTable(db, table)
val oldSchema = catalogTable.schema

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,11 @@ import scala.collection.mutable

import org.apache.spark.sql.{AnalysisException, Row, SparkSession}
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.{UnresolvedFunction, UnresolvedRelation}
import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, CatalogTable, CatalogTableType}
import org.apache.spark.sql.catalyst.expressions.{Alias, SubqueryExpression}
import org.apache.spark.sql.catalyst.expressions.Alias
import org.apache.spark.sql.catalyst.plans.QueryPlan
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project, View}
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project}
import org.apache.spark.sql.types.MetadataBuilder
import org.apache.spark.sql.util.SchemaUtils


/**
Expand Down Expand Up @@ -124,28 +122,19 @@ case class CreateViewCommand(
}

override def run(sparkSession: SparkSession): Seq[Row] = {
// If the plan cannot be analyzed, throw an exception and don't proceed.
val qe = sparkSession.sessionState.executePlan(child)
qe.assertAnalyzed()
val analyzedPlan = qe.analyzed

if (userSpecifiedColumns.nonEmpty &&
userSpecifiedColumns.length != analyzedPlan.output.length) {
userSpecifiedColumns.length != child.output.length) {
throw new AnalysisException(s"The number of columns produced by the SELECT clause " +
s"(num: `${analyzedPlan.output.length}`) does not match the number of column names " +
s"(num: `${child.output.length}`) does not match the number of column names " +
s"specified by CREATE VIEW (num: `${userSpecifiedColumns.length}`).")
}

// When creating a permanent view, not allowed to reference temporary objects.
// This should be called after `qe.assertAnalyzed()` (i.e., `child` can be resolved)
verifyTemporaryObjectsNotExists(sparkSession)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I moved verifyTemporaryObjectsNotExists to rules because qe.assertAnalyzed() is called in rules and a resolved plan is passed here.


val catalog = sparkSession.sessionState.catalog
if (viewType == LocalTempView) {
val aliasedPlan = aliasPlan(sparkSession, analyzedPlan)
val aliasedPlan = aliasPlan(sparkSession, child)
catalog.createTempView(name.table, aliasedPlan, overrideIfExists = replace)
} else if (viewType == GlobalTempView) {
val aliasedPlan = aliasPlan(sparkSession, analyzedPlan)
val aliasedPlan = aliasPlan(sparkSession, child)
catalog.createGlobalTempView(name.table, aliasedPlan, overrideIfExists = replace)
} else if (catalog.tableExists(name)) {
val tableMetadata = catalog.getTableMetadata(name)
Expand All @@ -155,14 +144,11 @@ case class CreateViewCommand(
} else if (tableMetadata.tableType != CatalogTableType.VIEW) {
throw new AnalysisException(s"$name is not a view")
} else if (replace) {
// Detect cyclic view reference on CREATE OR REPLACE VIEW.
val viewIdent = tableMetadata.identifier
checkCyclicViewReference(analyzedPlan, Seq(viewIdent), viewIdent)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To pass the existing tests, I moved checkCyclicViewReference into rules. Since the duplication checks also catch the cyclic cases, I think we need to check the cyclic cases first, and then check the name duplication.


// Handles `CREATE OR REPLACE VIEW v0 AS SELECT ...`
// Nothing we need to retain from the old view, so just drop and create a new one
val viewIdent = tableMetadata.identifier
catalog.dropTable(viewIdent, ignoreIfNotExists = false, purge = false)
catalog.createTable(prepareTable(sparkSession, analyzedPlan), ignoreIfExists = false)
catalog.createTable(prepareTable(sparkSession, child), ignoreIfExists = false)
} else {
// Handles `CREATE VIEW v0 AS SELECT ...`. Throws exception when the target view already
// exists.
Expand All @@ -172,39 +158,11 @@ case class CreateViewCommand(
}
} else {
// Create the view if it doesn't exist.
catalog.createTable(prepareTable(sparkSession, analyzedPlan), ignoreIfExists = false)
catalog.createTable(prepareTable(sparkSession, child), ignoreIfExists = false)
}
Seq.empty[Row]
}

/**
* Permanent views are not allowed to reference temp objects, including temp function and views
*/
private def verifyTemporaryObjectsNotExists(sparkSession: SparkSession): Unit = {
if (!isTemporary) {
// This func traverses the unresolved plan `child`. Below are the reasons:
// 1) Analyzer replaces unresolved temporary views by a SubqueryAlias with the corresponding
// logical plan. After replacement, it is impossible to detect whether the SubqueryAlias is
// added/generated from a temporary view.
// 2) The temp functions are represented by multiple classes. Most are inaccessible from this
// package (e.g., HiveGenericUDF).
child.collect {
// Disallow creating permanent views based on temporary views.
case s: UnresolvedRelation
if sparkSession.sessionState.catalog.isTemporaryTable(s.tableIdentifier) =>
throw new AnalysisException(s"Not allowed to create a permanent view $name by " +
s"referencing a temporary view ${s.tableIdentifier}")
case other if !other.resolved => other.expressions.flatMap(_.collect {
// Disallow creating permanent views based on temporary UDFs.
case e: UnresolvedFunction
if sparkSession.sessionState.catalog.isTemporaryFunction(e.name) =>
throw new AnalysisException(s"Not allowed to create a permanent view $name by " +
s"referencing a temporary function `${e.name}`")
})
}
}
}

/**
* If `userSpecifiedColumns` is defined, alias the analyzed plan to the user specified columns,
* else return the analyzed plan directly.
Expand Down Expand Up @@ -270,15 +228,10 @@ case class AlterViewAsCommand(
override def innerChildren: Seq[QueryPlan[_]] = Seq(query)

override def run(session: SparkSession): Seq[Row] = {
// If the plan cannot be analyzed, throw an exception and don't proceed.
val qe = session.sessionState.executePlan(query)
qe.assertAnalyzed()
val analyzedPlan = qe.analyzed

if (session.sessionState.catalog.alterTempViewDefinition(name, analyzedPlan)) {
if (session.sessionState.catalog.alterTempViewDefinition(name, query)) {
// a local/global temp view has been altered, we are done.
} else {
alterPermanentView(session, analyzedPlan)
alterPermanentView(session, query)
}

Seq.empty[Row]
Expand All @@ -290,10 +243,6 @@ case class AlterViewAsCommand(
throw new AnalysisException(s"${viewMeta.identifier} is not a view.")
}

// Detect cyclic view reference on ALTER VIEW.
val viewIdent = viewMeta.identifier
checkCyclicViewReference(analyzedPlan, Seq(viewIdent), viewIdent)

val newProperties = generateViewProperties(viewMeta.properties, session, analyzedPlan)

val updatedViewMeta = viewMeta.copy(
Expand Down Expand Up @@ -356,66 +305,12 @@ object ViewHelper {
properties: Map[String, String],
session: SparkSession,
analyzedPlan: LogicalPlan): Map[String, String] = {
val queryOutput = analyzedPlan.schema.fieldNames

// Generate the query column names, throw an AnalysisException if there exists duplicate column
// names.
SchemaUtils.checkColumnNameDuplication(
queryOutput, "the view", session.sessionState.conf.resolver)

// Generate the view default database name.
val queryOutput = analyzedPlan.schema.fieldNames
val viewDefaultDatabase = session.sessionState.catalog.getCurrentDatabase

removeQueryColumnNames(properties) ++
generateViewDefaultDatabase(viewDefaultDatabase) ++
generateQueryColumnNames(queryOutput)
}

/**
* Recursively search the logical plan to detect cyclic view references, throw an
* AnalysisException if cycle detected.
*
* A cyclic view reference is a cycle of reference dependencies, for example, if the following
* statements are executed:
* CREATE VIEW testView AS SELECT id FROM tbl
* CREATE VIEW testView2 AS SELECT id FROM testView
* ALTER VIEW testView AS SELECT * FROM testView2
* The view `testView` references `testView2`, and `testView2` also references `testView`,
* therefore a reference cycle (testView -> testView2 -> testView) exists.
*
* @param plan the logical plan we detect cyclic view references from.
* @param path the path between the altered view and current node.
* @param viewIdent the table identifier of the altered view, we compare two views by the
* `desc.identifier`.
*/
def checkCyclicViewReference(
plan: LogicalPlan,
path: Seq[TableIdentifier],
viewIdent: TableIdentifier): Unit = {
plan match {
case v: View =>
val ident = v.desc.identifier
val newPath = path :+ ident
// If the table identifier equals to the `viewIdent`, current view node is the same with
// the altered view. We detect a view reference cycle, should throw an AnalysisException.
if (ident == viewIdent) {
throw new AnalysisException(s"Recursive view $viewIdent detected " +
s"(cycle: ${newPath.mkString(" -> ")})")
} else {
v.children.foreach { child =>
checkCyclicViewReference(child, newPath, viewIdent)
}
}
case _ =>
plan.children.foreach(child => checkCyclicViewReference(child, path, viewIdent))
}

// Detect cyclic references from subqueries.
plan.expressions.foreach { expr =>
expr match {
case s: SubqueryExpression =>
checkCyclicViewReference(s.plan, path, viewIdent)
case _ => // Do nothing.
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -352,9 +352,6 @@ case class DataSource(
"It must be specified manually")
}

SchemaUtils.checkSchemaColumnNameDuplication(
dataSchema, "in the datasource", sparkSession.sessionState.conf.caseSensitiveAnalysis)

HadoopFsRelation(
fileCatalog,
partitionSchema = fileCatalog.partitionSchema,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,13 +63,6 @@ case class InsertIntoHadoopFsRelationCommand(

override def run(sparkSession: SparkSession, children: Seq[SparkPlan]): Seq[Row] = {
assert(children.length == 1)

// Most formats don't do well with duplicate columns, so lets not allow that
SchemaUtils.checkSchemaColumnNameDuplication(
query.schema,
s"when inserting into $outputPath",
sparkSession.sessionState.conf.caseSensitiveAnalysis)

val hadoopConf = sparkSession.sessionState.newHadoopConfWithOptions(options)
val fs = outputPath.getFileSystem(hadoopConf)
val qualifiedOutputPath = outputPath.makeQualified(fs.getUri, fs.getWorkingDirectory)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
import org.apache.spark.sql.catalyst.expressions.{Cast, Literal}
import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.sql.types._
import org.apache.spark.sql.util.SchemaUtils

// TODO: We should tighten up visibility of the classes here once we clean up Hive coupling.

Expand Down Expand Up @@ -302,9 +301,6 @@ object PartitioningUtils {
normalizedKey -> value
}

SchemaUtils.checkColumnNameDuplication(
normalizedPartSpec.map(_._1), "in the partition specification", resolver)

normalizedPartSpec.toMap
}

Expand Down
Loading