Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
59 commits
Select commit Hold shift + click to select a range
9bbfe6e
[SPARK-21786][SQL] When acquiring 'compressionCodecClassName' in 'Par…
fjh100456 Dec 25, 2017
48cf108
[SPARK-21786][SQL] When acquiring 'compressionCodecClassName' in 'Par…
fjh100456 Dec 25, 2017
5dbd3ed
spark.sql.parquet.compression.codec[SPARK-21786][SQL] When acquiring …
fjh100456 Dec 25, 2017
5124f1b
spark.sql.parquet.compression.codec[SPARK-21786][SQL] When acquiring …
fjh100456 Dec 25, 2017
6907a3e
Make comression codec take effect in hive table writing.
fjh100456 Dec 25, 2017
67e40d4
Modify test
fjh100456 Dec 25, 2017
e2526ca
Separate the pr
fjh100456 Dec 26, 2017
8ae86ee
Add test case with the table containing mixed compression codec
fjh100456 Dec 26, 2017
94ac716
Revert back
fjh100456 Dec 26, 2017
43e041f
Revert back
fjh100456 Dec 26, 2017
ee0c558
Add a new line at the of file
fjh100456 Dec 26, 2017
e9f705d
Fix scala style
fjh100456 Jan 2, 2018
d3aa7a0
Fix scala style
fjh100456 Jan 2, 2018
5244aaf
[SPARK-22897][CORE] Expose stageAttemptId in TaskContext
advancedxy Jan 2, 2018
b96a213
[SPARK-22938] Assert that SQLConf.get is accessed only on the driver.
juliuszsompolski Jan 3, 2018
a05e85e
[SPARK-22934][SQL] Make optional clauses order insensitive for CREATE…
gatorsmile Jan 3, 2018
b962488
[SPARK-20236][SQL] dynamic partition overwrite
cloud-fan Jan 3, 2018
27c949d
[SPARK-22932][SQL] Refactor AnalysisContext
gatorsmile Jan 2, 2018
79f7263
[SPARK-22896] Improvement in String interpolation
chetkhatri Jan 3, 2018
a51212b
[SPARK-20960][SQL] make ColumnVector public
cloud-fan Jan 3, 2018
f51c8fd
[SPARK-22944][SQL] improve FoldablePropagation
cloud-fan Jan 4, 2018
1860a43
[SPARK-22933][SPARKR] R Structured Streaming API for withWatermark, t…
felixcheung Jan 4, 2018
a7cfd6b
[SPARK-22950][SQL] Handle ChildFirstURLClassLoader's parent
yaooqinn Jan 4, 2018
eb99b8a
[SPARK-22945][SQL] add java UDF APIs in the functions object
cloud-fan Jan 4, 2018
1f5e354
[SPARK-22939][PYSPARK] Support Spark UDF in registerFunction
gatorsmile Jan 4, 2018
bcfeef5
[SPARK-22771][SQL] Add a missing return statement in Concat.checkInpu…
maropu Jan 4, 2018
cd92913
[SPARK-21475][CORE][2ND ATTEMPT] Change to use NIO's Files API for ex…
jerryshao Jan 4, 2018
bc4bef4
[SPARK-22850][CORE] Ensure queued events are delivered to all event q…
Jan 4, 2018
2ab4012
[SPARK-22948][K8S] Move SparkPodInitContainer to correct package.
Jan 4, 2018
84707f0
[SPARK-22953][K8S] Avoids adding duplicated secret volumes when init-…
liyinan926 Jan 4, 2018
ea9da61
[SPARK-22960][K8S] Make build-push-docker-images.sh more dev-friendly.
Jan 5, 2018
158f7e6
[SPARK-22957] ApproxQuantile breaks if the number of rows exceeds MaxInt
juliuszsompolski Jan 5, 2018
145820b
[SPARK-22825][SQL] Fix incorrect results of Casting Array to String
maropu Jan 5, 2018
5b524cc
[SPARK-22949][ML] Apply CrossValidator approach to Driver/Distributed…
MrBago Jan 5, 2018
f9dcdbc
[SPARK-22757][K8S] Enable spark.jars and spark.files in KUBERNETES mode
liyinan926 Jan 5, 2018
fd4e304
[SPARK-22961][REGRESSION] Constant columns should generate QueryPlanC…
adrian-ionescu Jan 5, 2018
0a30e93
[SPARK-22940][SQL] HiveExternalCatalogVersionsSuite should succeed on…
bersprockets Jan 5, 2018
d1f422c
[SPARK-13030][ML] Follow-up cleanups for OneHotEncoderEstimator
jkbradley Jan 5, 2018
55afac4
[SPARK-22914][DEPLOY] Register history.ui.port
gerashegalov Jan 6, 2018
bf85301
[SPARK-22937][SQL] SQL elt output binary for binary inputs
maropu Jan 6, 2018
3e3e938
[SPARK-22960][K8S] Revert use of ARG base_image in images
liyinan926 Jan 6, 2018
7236914
[SPARK-22930][PYTHON][SQL] Improve the description of Vectorized UDFs…
icexelloss Jan 6, 2018
e6449e8
[SPARK-22793][SQL] Memory leak in Spark Thrift Server
Jan 6, 2018
0377755
[SPARK-21786][SQL] When acquiring 'compressionCodecClassName' in 'Par…
fjh100456 Jan 6, 2018
b66700a
[SPARK-22901][PYTHON][FOLLOWUP] Adds the doc for asNondeterministic f…
HyukjinKwon Jan 6, 2018
f9e7b0c
[HOTFIX] Fix style checking failure
gatorsmile Jan 6, 2018
285d342
[SPARK-22973][SQL] Fix incorrect results of Casting Map to String
maropu Jan 7, 2018
bd1a80a
Merge remote-tracking branch 'upstream/branch-2.3'
fjh100456 Jan 8, 2018
584cdc2
Merge pull request #2 from apache/master
fjh100456 Jan 8, 2018
5b150bc
Fix test issue
fjh100456 Jan 8, 2018
2337edd
Merge pull request #1 from apache/master
fjh100456 Jan 8, 2018
43e7eb5
Merge branch 'master' of https://github.com/fjh100456/spark
fjh100456 Jan 9, 2018
4b89b44
consider the precedence of `hive.exec.compress.output`
fjh100456 Jan 11, 2018
6cf32e0
Resume to private and add public function
fjh100456 Jan 19, 2018
365c5bf
Resume to private and add public function
fjh100456 Jan 19, 2018
99271d6
Fix test issue
fjh100456 Jan 19, 2018
2b9dfbe
Fix test issue
fjh100456 Jan 20, 2018
5b5e1df
Fix style issue
fjh100456 Jan 20, 2018
118f788
Fix style issue
fjh100456 Jan 20, 2018
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
[SPARK-22934][SQL] Make optional clauses order insensitive for CREATE…
… TABLE SQL statement

## What changes were proposed in this pull request?
Currently, our CREATE TABLE syntax require the EXACT order of clauses. It is pretty hard to remember the exact order. Thus, this PR is to make optional clauses order insensitive for `CREATE TABLE` SQL statement.

```
CREATE [TEMPORARY] TABLE [IF NOT EXISTS] [db_name.]table_name
    [(col_name1 col_type1 [COMMENT col_comment1], ...)]
    USING datasource
    [OPTIONS (key1=val1, key2=val2, ...)]
    [PARTITIONED BY (col_name1, col_name2, ...)]
    [CLUSTERED BY (col_name3, col_name4, ...) INTO num_buckets BUCKETS]
    [LOCATION path]
    [COMMENT table_comment]
    [TBLPROPERTIES (key1=val1, key2=val2, ...)]
    [AS select_statement]
```

The proposal is to make the following clauses order insensitive.
```
    [OPTIONS (key1=val1, key2=val2, ...)]
    [PARTITIONED BY (col_name1, col_name2, ...)]
    [CLUSTERED BY (col_name3, col_name4, ...) INTO num_buckets BUCKETS]
    [LOCATION path]
    [COMMENT table_comment]
    [TBLPROPERTIES (key1=val1, key2=val2, ...)]
```

The same idea is also applicable to Create Hive Table.
```
CREATE [EXTERNAL] TABLE [IF NOT EXISTS] [db_name.]table_name
    [(col_name1[:] col_type1 [COMMENT col_comment1], ...)]
    [COMMENT table_comment]
    [PARTITIONED BY (col_name2[:] col_type2 [COMMENT col_comment2], ...)]
    [ROW FORMAT row_format]
    [STORED AS file_format]
    [LOCATION path]
    [TBLPROPERTIES (key1=val1, key2=val2, ...)]
    [AS select_statement]
```

The proposal is to make the following clauses order insensitive.
```
    [COMMENT table_comment]
    [PARTITIONED BY (col_name2[:] col_type2 [COMMENT col_comment2], ...)]
    [ROW FORMAT row_format]
    [STORED AS file_format]
    [LOCATION path]
    [TBLPROPERTIES (key1=val1, key2=val2, ...)]
```

## How was this patch tested?
Added test cases

Author: gatorsmile <[email protected]>

Closes #20133 from gatorsmile/createDataSourceTableDDL.

(cherry picked from commit 1a87a16)
Signed-off-by: gatorsmile <[email protected]>
  • Loading branch information
gatorsmile committed Jan 3, 2018
commit a05e85ecb76091567a26a3a14ad0879b4728addc
Original file line number Diff line number Diff line change
Expand Up @@ -73,18 +73,22 @@ statement
| ALTER DATABASE identifier SET DBPROPERTIES tablePropertyList #setDatabaseProperties
| DROP DATABASE (IF EXISTS)? identifier (RESTRICT | CASCADE)? #dropDatabase
| createTableHeader ('(' colTypeList ')')? tableProvider
(OPTIONS options=tablePropertyList)?
(PARTITIONED BY partitionColumnNames=identifierList)?
bucketSpec? locationSpec?
(COMMENT comment=STRING)?
(TBLPROPERTIES tableProps=tablePropertyList)?
((OPTIONS options=tablePropertyList) |
(PARTITIONED BY partitionColumnNames=identifierList) |
bucketSpec |
locationSpec |
(COMMENT comment=STRING) |
(TBLPROPERTIES tableProps=tablePropertyList))*
(AS? query)? #createTable
| createTableHeader ('(' columns=colTypeList ')')?
(COMMENT comment=STRING)?
(PARTITIONED BY '(' partitionColumns=colTypeList ')')?
bucketSpec? skewSpec?
rowFormat? createFileFormat? locationSpec?
(TBLPROPERTIES tablePropertyList)?
((COMMENT comment=STRING) |
(PARTITIONED BY '(' partitionColumns=colTypeList ')') |
bucketSpec |
skewSpec |
rowFormat |
createFileFormat |
locationSpec |
(TBLPROPERTIES tableProps=tablePropertyList))*
(AS? query)? #createHiveTable
| CREATE TABLE (IF NOT EXISTS)? target=tableIdentifier
LIKE source=tableIdentifier locationSpec? #createTableLike
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
*/
package org.apache.spark.sql.catalyst.parser

import java.util

import scala.collection.mutable.StringBuilder

import org.antlr.v4.runtime.{ParserRuleContext, Token}
Expand All @@ -39,6 +41,13 @@ object ParserUtils {
throw new ParseException(s"Operation not allowed: $message", ctx)
}

def checkDuplicateClauses[T](
nodes: util.List[T], clauseName: String, ctx: ParserRuleContext): Unit = {
if (nodes.size() > 1) {
throw new ParseException(s"Found duplicate clauses: $clauseName", ctx)
}
}

/** Check if duplicate keys exist in a set of key-value pairs. */
def checkDuplicateKeys[T](keyPairs: Seq[(String, T)], ctx: ParserRuleContext): Unit = {
keyPairs.groupBy(_._1).filter(_._2.size > 1).foreach { case (key, _) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -383,23 +383,34 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder(conf) {
* {{{
* CREATE [TEMPORARY] TABLE [IF NOT EXISTS] [db_name.]table_name
* USING table_provider
* [OPTIONS table_property_list]
* [PARTITIONED BY (col_name, col_name, ...)]
* [CLUSTERED BY (col_name, col_name, ...)
* [SORTED BY (col_name [ASC|DESC], ...)]
* INTO num_buckets BUCKETS
* ]
* [LOCATION path]
* [COMMENT table_comment]
* [TBLPROPERTIES (property_name=property_value, ...)]
* create_table_clauses
* [[AS] select_statement];
*
* create_table_clauses (order insensitive):
* [OPTIONS table_property_list]
* [PARTITIONED BY (col_name, col_name, ...)]
* [CLUSTERED BY (col_name, col_name, ...)
* [SORTED BY (col_name [ASC|DESC], ...)]
* INTO num_buckets BUCKETS
* ]
* [LOCATION path]
* [COMMENT table_comment]
* [TBLPROPERTIES (property_name=property_value, ...)]
* }}}
*/
override def visitCreateTable(ctx: CreateTableContext): LogicalPlan = withOrigin(ctx) {
val (table, temp, ifNotExists, external) = visitCreateTableHeader(ctx.createTableHeader)
if (external) {
operationNotAllowed("CREATE EXTERNAL TABLE ... USING", ctx)
}

checkDuplicateClauses(ctx.TBLPROPERTIES, "TBLPROPERTIES", ctx)
checkDuplicateClauses(ctx.OPTIONS, "OPTIONS", ctx)
checkDuplicateClauses(ctx.PARTITIONED, "PARTITIONED BY", ctx)
checkDuplicateClauses(ctx.COMMENT, "COMMENT", ctx)
checkDuplicateClauses(ctx.bucketSpec(), "CLUSTERED BY", ctx)
checkDuplicateClauses(ctx.locationSpec, "LOCATION", ctx)

val options = Option(ctx.options).map(visitPropertyKeyValues).getOrElse(Map.empty)
val provider = ctx.tableProvider.qualifiedName.getText
val schema = Option(ctx.colTypeList()).map(createSchema)
Expand All @@ -408,9 +419,9 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder(conf) {
.map(visitIdentifierList(_).toArray)
.getOrElse(Array.empty[String])
val properties = Option(ctx.tableProps).map(visitPropertyKeyValues).getOrElse(Map.empty)
val bucketSpec = Option(ctx.bucketSpec()).map(visitBucketSpec)
val bucketSpec = ctx.bucketSpec().asScala.headOption.map(visitBucketSpec)

val location = Option(ctx.locationSpec).map(visitLocationSpec)
val location = ctx.locationSpec.asScala.headOption.map(visitLocationSpec)
val storage = DataSource.buildStorageFormatFromOptions(options)

if (location.isDefined && storage.locationUri.isDefined) {
Expand Down Expand Up @@ -1087,13 +1098,16 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder(conf) {
* {{{
* CREATE [EXTERNAL] TABLE [IF NOT EXISTS] [db_name.]table_name
* [(col1[:] data_type [COMMENT col_comment], ...)]
* [COMMENT table_comment]
* [PARTITIONED BY (col2[:] data_type [COMMENT col_comment], ...)]
* [ROW FORMAT row_format]
* [STORED AS file_format]
* [LOCATION path]
* [TBLPROPERTIES (property_name=property_value, ...)]
* create_table_clauses
* [AS select_statement];
*
* create_table_clauses (order insensitive):
* [COMMENT table_comment]
* [PARTITIONED BY (col2[:] data_type [COMMENT col_comment], ...)]
* [ROW FORMAT row_format]
* [STORED AS file_format]
* [LOCATION path]
* [TBLPROPERTIES (property_name=property_value, ...)]
* }}}
*/
override def visitCreateHiveTable(ctx: CreateHiveTableContext): LogicalPlan = withOrigin(ctx) {
Expand All @@ -1104,28 +1118,36 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder(conf) {
"CREATE TEMPORARY TABLE is not supported yet. " +
"Please use CREATE TEMPORARY VIEW as an alternative.", ctx)
}
if (ctx.skewSpec != null) {
if (ctx.skewSpec.size > 0) {
operationNotAllowed("CREATE TABLE ... SKEWED BY", ctx)
}

checkDuplicateClauses(ctx.TBLPROPERTIES, "TBLPROPERTIES", ctx)
checkDuplicateClauses(ctx.PARTITIONED, "PARTITIONED BY", ctx)
checkDuplicateClauses(ctx.COMMENT, "COMMENT", ctx)
checkDuplicateClauses(ctx.bucketSpec(), "CLUSTERED BY", ctx)
checkDuplicateClauses(ctx.createFileFormat, "STORED AS/BY", ctx)
checkDuplicateClauses(ctx.rowFormat, "ROW FORMAT", ctx)
checkDuplicateClauses(ctx.locationSpec, "LOCATION", ctx)

val dataCols = Option(ctx.columns).map(visitColTypeList).getOrElse(Nil)
val partitionCols = Option(ctx.partitionColumns).map(visitColTypeList).getOrElse(Nil)
val properties = Option(ctx.tablePropertyList).map(visitPropertyKeyValues).getOrElse(Map.empty)
val properties = Option(ctx.tableProps).map(visitPropertyKeyValues).getOrElse(Map.empty)
val selectQuery = Option(ctx.query).map(plan)
val bucketSpec = Option(ctx.bucketSpec()).map(visitBucketSpec)
val bucketSpec = ctx.bucketSpec().asScala.headOption.map(visitBucketSpec)

// Note: Hive requires partition columns to be distinct from the schema, so we need
// to include the partition columns here explicitly
val schema = StructType(dataCols ++ partitionCols)

// Storage format
val defaultStorage = HiveSerDe.getDefaultStorage(conf)
validateRowFormatFileFormat(ctx.rowFormat, ctx.createFileFormat, ctx)
val fileStorage = Option(ctx.createFileFormat).map(visitCreateFileFormat)
validateRowFormatFileFormat(ctx.rowFormat.asScala, ctx.createFileFormat.asScala, ctx)
val fileStorage = ctx.createFileFormat.asScala.headOption.map(visitCreateFileFormat)
.getOrElse(CatalogStorageFormat.empty)
val rowStorage = Option(ctx.rowFormat).map(visitRowFormat)
val rowStorage = ctx.rowFormat.asScala.headOption.map(visitRowFormat)
.getOrElse(CatalogStorageFormat.empty)
val location = Option(ctx.locationSpec).map(visitLocationSpec)
val location = ctx.locationSpec.asScala.headOption.map(visitLocationSpec)
// If we are creating an EXTERNAL table, then the LOCATION field is required
if (external && location.isEmpty) {
operationNotAllowed("CREATE EXTERNAL TABLE must be accompanied by LOCATION", ctx)
Expand Down Expand Up @@ -1180,7 +1202,7 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder(conf) {
ctx)
}

val hasStorageProperties = (ctx.createFileFormat != null) || (ctx.rowFormat != null)
val hasStorageProperties = (ctx.createFileFormat.size != 0) || (ctx.rowFormat.size != 0)
if (conf.convertCTAS && !hasStorageProperties) {
// At here, both rowStorage.serdeProperties and fileStorage.serdeProperties
// are empty Maps.
Expand Down Expand Up @@ -1366,6 +1388,15 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder(conf) {
}
}

private def validateRowFormatFileFormat(
rowFormatCtx: Seq[RowFormatContext],
createFileFormatCtx: Seq[CreateFileFormatContext],
parentCtx: ParserRuleContext): Unit = {
if (rowFormatCtx.size == 1 && createFileFormatCtx.size == 1) {
validateRowFormatFileFormat(rowFormatCtx.head, createFileFormatCtx.head, parentCtx)
}
}

/**
* Create or replace a view. This creates a [[CreateViewCommand]] command.
*
Expand Down
Loading