Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
fix
  • Loading branch information
gatorsmile committed Jan 1, 2018
commit 8ae8f1832a62caf10a62511f339402c0d94f89ea
Original file line number Diff line number Diff line change
Expand Up @@ -73,18 +73,22 @@ statement
| ALTER DATABASE identifier SET DBPROPERTIES tablePropertyList #setDatabaseProperties
| DROP DATABASE (IF EXISTS)? identifier (RESTRICT | CASCADE)? #dropDatabase
| createTableHeader ('(' colTypeList ')')? tableProvider
(OPTIONS options=tablePropertyList)?
(PARTITIONED BY partitionColumnNames=identifierList)?
bucketSpec? locationSpec?
(COMMENT comment=STRING)?
(TBLPROPERTIES tableProps=tablePropertyList)?
((OPTIONS options=tablePropertyList) |
(PARTITIONED BY partitionColumnNames=identifierList) |
bucketSpec |
locationSpec |
(COMMENT comment=STRING) |
(TBLPROPERTIES tableProps=tablePropertyList))*
(AS? query)? #createTable
| createTableHeader ('(' columns=colTypeList ')')?
(COMMENT comment=STRING)?
(PARTITIONED BY '(' partitionColumns=colTypeList ')')?
bucketSpec? skewSpec?
rowFormat? createFileFormat? locationSpec?
(TBLPROPERTIES tablePropertyList)?
((COMMENT comment=STRING) |
(PARTITIONED BY '(' partitionColumns=colTypeList ')') |
bucketSpec |
skewSpec |
rowFormat |
createFileFormat |
locationSpec |
(TBLPROPERTIES tableProps=tablePropertyList))*
(AS? query)? #createHiveTable
| CREATE TABLE (IF NOT EXISTS)? target=tableIdentifier
LIKE source=tableIdentifier locationSpec? #createTableLike
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
*/
package org.apache.spark.sql.catalyst.parser

import java.util

import scala.collection.mutable.StringBuilder

import org.antlr.v4.runtime.{ParserRuleContext, Token}
Expand All @@ -39,6 +41,17 @@ object ParserUtils {
throw new ParseException(s"Operation not allowed: $message", ctx)
}

def duplicateClausesNotAllowed(message: String, ctx: ParserRuleContext): Nothing = {
throw new ParseException(s"Found duplicate clauses: $message", ctx)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We cannot merge these two functions to check the duplication?
e.g.,

  def checkDuplicateClauses[T](nodes: util.List[T], clauseName: String, ctx: ParserRuleContext): Unit = {
    if (nodes.size() > 1) {
      throw new ParseException(s"Found duplicate clauses: $clauseName", ctx)
    }
  }

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good to me!

}

def checkDuplicateClauses(
nodes: util.List[TerminalNode], clauseName: String, ctx: ParserRuleContext): Unit = {
if (nodes.size() > 1) {
throw new ParseException(s"Found duplicate clauses: $clauseName", ctx)
}
}

/** Check if duplicate keys exist in a set of key-value pairs. */
def checkDuplicateKeys[T](keyPairs: Seq[(String, T)], ctx: ParserRuleContext): Unit = {
keyPairs.groupBy(_._1).filter(_._2.size > 1).foreach { case (key, _) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -384,22 +384,31 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder(conf) {
* CREATE [TEMPORARY] TABLE [IF NOT EXISTS] [db_name.]table_name
* USING table_provider
* [OPTIONS table_property_list]
* [PARTITIONED BY (col_name, col_name, ...)]
* [CLUSTERED BY (col_name, col_name, ...)
* [SORTED BY (col_name [ASC|DESC], ...)]
* INTO num_buckets BUCKETS
* ]
* [LOCATION path]
* [COMMENT table_comment]
* [TBLPROPERTIES (property_name=property_value, ...)]
* create_table_clauses
* [[AS] select_statement];
*
* create_table_clauses (order insensitive):
* [PARTITIONED BY (col_name, col_name, ...)]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't [OPTIONS table_property_list] one of create_table_clauses?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

forgot it.

* [CLUSTERED BY (col_name, col_name, ...)
* [SORTED BY (col_name [ASC|DESC], ...)]
* INTO num_buckets BUCKETS
* ]
* [LOCATION path]
* [COMMENT table_comment]
* [TBLPROPERTIES (property_name=property_value, ...)]
* }}}
*/
override def visitCreateTable(ctx: CreateTableContext): LogicalPlan = withOrigin(ctx) {
val (table, temp, ifNotExists, external) = visitCreateTableHeader(ctx.createTableHeader)
if (external) {
operationNotAllowed("CREATE EXTERNAL TABLE ... USING", ctx)
}

checkDuplicateClauses(ctx.TBLPROPERTIES, "TBLPROPERTIES", ctx)
checkDuplicateClauses(ctx.OPTIONS, "OPTIONS", ctx)
checkDuplicateClauses(ctx.PARTITIONED, "PARTITIONED BY", ctx)
checkDuplicateClauses(ctx.COMMENT, "COMMENT", ctx)

val options = Option(ctx.options).map(visitPropertyKeyValues).getOrElse(Map.empty)
val provider = ctx.tableProvider.qualifiedName.getText
val schema = Option(ctx.colTypeList()).map(createSchema)
Expand All @@ -408,9 +417,17 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder(conf) {
.map(visitIdentifierList(_).toArray)
.getOrElse(Array.empty[String])
val properties = Option(ctx.tableProps).map(visitPropertyKeyValues).getOrElse(Map.empty)
val bucketSpec = Option(ctx.bucketSpec()).map(visitBucketSpec)
val bucketSpec = if (ctx.bucketSpec().size > 1) {
duplicateClausesNotAllowed("CLUSTERED BY", ctx)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you split the validation logic and the extraction logic? In this case I'd move the check to line 411 and do the extract on line 420.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure

} else {
ctx.bucketSpec().asScala.headOption.map(visitBucketSpec)
}

val location = Option(ctx.locationSpec).map(visitLocationSpec)
val location = if (ctx.locationSpec.size > 1) {
duplicateClausesNotAllowed("LOCATION", ctx)
} else {
ctx.locationSpec.asScala.headOption.map(visitLocationSpec)
}
val storage = DataSource.buildStorageFormatFromOptions(options)

if (location.isDefined && storage.locationUri.isDefined) {
Expand Down Expand Up @@ -1087,13 +1104,16 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder(conf) {
* {{{
* CREATE [EXTERNAL] TABLE [IF NOT EXISTS] [db_name.]table_name
* [(col1[:] data_type [COMMENT col_comment], ...)]
* [COMMENT table_comment]
* [PARTITIONED BY (col2[:] data_type [COMMENT col_comment], ...)]
* [ROW FORMAT row_format]
* [STORED AS file_format]
* [LOCATION path]
* [TBLPROPERTIES (property_name=property_value, ...)]
* create_table_clauses
* [AS select_statement];
*
* create_table_clauses (order insensitive):
* [COMMENT table_comment]
* [PARTITIONED BY (col2[:] data_type [COMMENT col_comment], ...)]
* [ROW FORMAT row_format]
* [STORED AS file_format]
* [LOCATION path]
* [TBLPROPERTIES (property_name=property_value, ...)]
* }}}
*/
override def visitCreateHiveTable(ctx: CreateHiveTableContext): LogicalPlan = withOrigin(ctx) {
Expand All @@ -1104,28 +1124,48 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder(conf) {
"CREATE TEMPORARY TABLE is not supported yet. " +
"Please use CREATE TEMPORARY VIEW as an alternative.", ctx)
}
if (ctx.skewSpec != null) {
if (ctx.skewSpec.size > 0) {
operationNotAllowed("CREATE TABLE ... SKEWED BY", ctx)
}

checkDuplicateClauses(ctx.TBLPROPERTIES, "TBLPROPERTIES", ctx)
checkDuplicateClauses(ctx.PARTITIONED, "PARTITIONED BY", ctx)
checkDuplicateClauses(ctx.COMMENT, "COMMENT", ctx)

val dataCols = Option(ctx.columns).map(visitColTypeList).getOrElse(Nil)
val partitionCols = Option(ctx.partitionColumns).map(visitColTypeList).getOrElse(Nil)
val properties = Option(ctx.tablePropertyList).map(visitPropertyKeyValues).getOrElse(Map.empty)
val properties = Option(ctx.tableProps).map(visitPropertyKeyValues).getOrElse(Map.empty)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what's the meaning of ctx.tableProps now? the union of all TABLE PROPERTY list?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The last one, if we have multiple clauses. However, we blocks this in the above checks.

val selectQuery = Option(ctx.query).map(plan)
val bucketSpec = Option(ctx.bucketSpec()).map(visitBucketSpec)
val bucketSpec = if (ctx.bucketSpec().size > 1) {
duplicateClausesNotAllowed("CLUSTERED BY", ctx)
} else {
ctx.bucketSpec().asScala.headOption.map(visitBucketSpec)
}

// Note: Hive requires partition columns to be distinct from the schema, so we need
// to include the partition columns here explicitly
val schema = StructType(dataCols ++ partitionCols)

// Storage format
val defaultStorage = HiveSerDe.getDefaultStorage(conf)
validateRowFormatFileFormat(ctx.rowFormat, ctx.createFileFormat, ctx)
val fileStorage = Option(ctx.createFileFormat).map(visitCreateFileFormat)
.getOrElse(CatalogStorageFormat.empty)
val rowStorage = Option(ctx.rowFormat).map(visitRowFormat)
.getOrElse(CatalogStorageFormat.empty)
val location = Option(ctx.locationSpec).map(visitLocationSpec)
validateRowFormatFileFormat(ctx.rowFormat.asScala, ctx.createFileFormat.asScala, ctx)
val fileStorage = if (ctx.createFileFormat.size > 1) {
duplicateClausesNotAllowed("STORED AS/BY", ctx)
} else {
ctx.createFileFormat.asScala.headOption.map(visitCreateFileFormat)
.getOrElse(CatalogStorageFormat.empty)
}
val rowStorage = if (ctx.rowFormat.size > 1) {
duplicateClausesNotAllowed("ROW FORMAT", ctx)
} else {
ctx.rowFormat.asScala.headOption.map(visitRowFormat)
.getOrElse(CatalogStorageFormat.empty)
}
val location = if (ctx.locationSpec.size > 1) {
duplicateClausesNotAllowed("LOCATION", ctx)
} else {
ctx.locationSpec.asScala.headOption.map(visitLocationSpec)
}
// If we are creating an EXTERNAL table, then the LOCATION field is required
if (external && location.isEmpty) {
operationNotAllowed("CREATE EXTERNAL TABLE must be accompanied by LOCATION", ctx)
Expand Down Expand Up @@ -1366,6 +1406,15 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder(conf) {
}
}

private def validateRowFormatFileFormat(
rowFormatCtx: Seq[RowFormatContext],
createFileFormatCtx: Seq[CreateFileFormatContext],
parentCtx: ParserRuleContext): Unit = {
if (rowFormatCtx.size == 1 && createFileFormatCtx.size == 1) {
validateRowFormatFileFormat(rowFormatCtx.head, createFileFormatCtx.head, parentCtx)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shall we just combine this method and the old validateRowFormatFileFormat?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will do it in a follow-up PR

}
}

/**
* Create or replace a view. This creates a [[CreateViewCommand]] command.
*
Expand Down
Loading