From 2ea2a4dd871f165523f0def2b0ca0b4b713c2585 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Mon, 24 Dec 2018 19:29:23 +0800 Subject: [PATCH 1/5] Hive CTAS creates partitioned table by specifying partition column names. --- .../spark/sql/catalyst/parser/SqlBase.g4 | 3 +- .../spark/sql/execution/SparkSqlParser.scala | 22 ++++++-- .../sql/hive/execution/HiveDDLSuite.scala | 55 ++++++++++++++++++- 3 files changed, 73 insertions(+), 7 deletions(-) diff --git a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 index 5e732edb17ba..b39681d886c5 100644 --- a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 +++ b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 @@ -88,7 +88,8 @@ statement (AS? query)? #createTable | createTableHeader ('(' columns=colTypeList ')')? ((COMMENT comment=STRING) | - (PARTITIONED BY '(' partitionColumns=colTypeList ')') | + (PARTITIONED BY '(' partitionColumns=colTypeList ')' | + PARTITIONED BY partitionColumnNames=identifierList) | bucketSpec | skewSpec | rowFormat | diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala index 364efea52830..340ac5ffe623 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala @@ -1196,13 +1196,16 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder(conf) { selectQuery match { case Some(q) => - // Hive does not allow to use a CTAS statement to create a partitioned table. + // Hive does not allow to use a CTAS statement to create a partitioned table + // by specifying table schema. if (tableDesc.partitionColumnNames.nonEmpty) { val errorMessage = "A Create Table As Select (CTAS) statement is not allowed to " + - "create a partitioned table using Hive's file formats. " + + "create a partitioned table using Hive's file formats by specifying table schema. " + "Please use the syntax of \"CREATE TABLE tableName USING dataSource " + "OPTIONS (...) PARTITIONED BY ...\" to create a partitioned table through a " + - "CTAS statement." + "CTAS statement. Since Spark 3.0, you can specify partition column names " + + "in CTAS statement like \"PARTITIONED BY (col_name, col_name, ...)\" when using " + + "Hive's file formats to create a partitioned table too." operationNotAllowed(errorMessage, ctx) } @@ -1213,16 +1216,25 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder(conf) { ctx) } + // Hive CTAS supports dynamic partition by specifying partition column names. + val partitionColumnNames = + Option(ctx.partitionColumnNames) + .map(visitIdentifierList(_).toArray) + .getOrElse(Array.empty[String]) + + val tableDescWithPartitionColNames = + tableDesc.copy(partitionColumnNames = partitionColumnNames) + val hasStorageProperties = (ctx.createFileFormat.size != 0) || (ctx.rowFormat.size != 0) if (conf.convertCTAS && !hasStorageProperties) { // At here, both rowStorage.serdeProperties and fileStorage.serdeProperties // are empty Maps. - val newTableDesc = tableDesc.copy( + val newTableDesc = tableDescWithPartitionColNames.copy( storage = CatalogStorageFormat.empty.copy(locationUri = locUri), provider = Some(conf.defaultDataSourceName)) CreateTable(newTableDesc, mode, Some(q)) } else { - CreateTable(tableDesc, mode, Some(q)) + CreateTable(tableDescWithPartitionColNames, mode, Some(q)) } case None => CreateTable(tableDesc, mode, None) } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala index fd38944a5dd2..0bc36b981aae 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala @@ -19,7 +19,6 @@ package org.apache.spark.sql.hive.execution import java.io.File import java.net.URI -import java.util.Date import scala.language.existentials @@ -33,6 +32,7 @@ import org.apache.spark.sql.{AnalysisException, QueryTest, Row, SaveMode} import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.analysis.{NoSuchPartitionException, TableAlreadyExistsException} import org.apache.spark.sql.catalyst.catalog._ +import org.apache.spark.sql.catalyst.parser.ParseException import org.apache.spark.sql.execution.command.{DDLSuite, DDLUtils} import org.apache.spark.sql.functions._ import org.apache.spark.sql.hive.HiveExternalCatalog @@ -2370,4 +2370,57 @@ class HiveDDLSuite )) } } + + test("Hive CTAS can't create partitioned table by specifying schema") { + val err1 = intercept[ParseException] { + spark.sql( + s""" + |CREATE TABLE t (a int) + |PARTITIONED BY (b string) + |STORED AS parquet + |AS SELECT 1 as a, "a" as b + """.stripMargin) + }.getMessage + assert(err1.contains("A Create Table As Select (CTAS) statement is not allowed " + + "to create a partitioned table using Hive's file formats by specifying table schema")) + + val err2 = intercept[ParseException] { + spark.sql( + s""" + |CREATE TABLE t + |PARTITIONED BY (b string) + |STORED AS parquet + |AS SELECT 1 as a, "a" as b + """.stripMargin) + }.getMessage + assert(err2.contains("A Create Table As Select (CTAS) statement is not allowed " + + "to create a partitioned table using Hive's file formats by specifying table schema")) + } + + test("Hive CTAS with dynamic partition") { + Seq("orc", "parquet").foreach { format => + withTable("t") { + withSQLConf("hive.exec.dynamic.partition.mode" -> "nonstrict") { + spark.sql( + s""" + |CREATE TABLE t + |PARTITIONED BY (b) + |STORED AS $format + |AS SELECT 1 as a, "a" as b + """.stripMargin) + checkAnswer(spark.table("t"), Row(1, "a")) + + assert(sql("DESC t").collect().containsSlice( + Seq( + Row("a", "int", null), + Row("b", "string", null), + Row("# Partition Information", "", ""), + Row("# col_name", "data_type", "comment"), + Row("b", "string", null) + ) + )) + } + } + } + } } From 934d6f11e9a3c1bcb9e888c9b9a7d287bdfb5792 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Tue, 25 Dec 2018 19:47:02 +0800 Subject: [PATCH 2/5] Fix message. --- .../org/apache/spark/sql/execution/SparkSqlParser.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala index 340ac5ffe623..7da09d33a0cc 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala @@ -1203,9 +1203,9 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder(conf) { "create a partitioned table using Hive's file formats by specifying table schema. " + "Please use the syntax of \"CREATE TABLE tableName USING dataSource " + "OPTIONS (...) PARTITIONED BY ...\" to create a partitioned table through a " + - "CTAS statement. Since Spark 3.0, you can specify partition column names " + - "in CTAS statement like \"PARTITIONED BY (col_name, col_name, ...)\" when using " + - "Hive's file formats to create a partitioned table too." + "CTAS statement. You can specify partition column names in CTAS statement like " + + "\"PARTITIONED BY (col_name, col_name, ...)\" when using Hive's file formats to " + + "create a partitioned table too." operationNotAllowed(errorMessage, ctx) } From 1a3c63c15e1b86b2b9feb80a6120bbf5085514a5 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Wed, 26 Dec 2018 14:45:43 +0800 Subject: [PATCH 3/5] Address comments. --- .../spark/sql/execution/SparkSqlParser.scala | 13 ++++--------- .../sql/hive/execution/HiveDDLSuite.scala | 19 ++++++------------- 2 files changed, 10 insertions(+), 22 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala index 7da09d33a0cc..f237244895c1 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala @@ -1196,16 +1196,11 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder(conf) { selectQuery match { case Some(q) => - // Hive does not allow to use a CTAS statement to create a partitioned table - // by specifying table schema. + // When creating partitioned table with CTAS statement, we can't specify data type for the + // partition columns. if (tableDesc.partitionColumnNames.nonEmpty) { - val errorMessage = "A Create Table As Select (CTAS) statement is not allowed to " + - "create a partitioned table using Hive's file formats by specifying table schema. " + - "Please use the syntax of \"CREATE TABLE tableName USING dataSource " + - "OPTIONS (...) PARTITIONED BY ...\" to create a partitioned table through a " + - "CTAS statement. You can specify partition column names in CTAS statement like " + - "\"PARTITIONED BY (col_name, col_name, ...)\" when using Hive's file formats to " + - "create a partitioned table too." + val errorMessage = "Create Partitioned Table As Select cannot specify data type for " + + "the partition columns of the target table." operationNotAllowed(errorMessage, ctx) } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala index 0bc36b981aae..16e5b4e62129 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala @@ -2381,8 +2381,8 @@ class HiveDDLSuite |AS SELECT 1 as a, "a" as b """.stripMargin) }.getMessage - assert(err1.contains("A Create Table As Select (CTAS) statement is not allowed " + - "to create a partitioned table using Hive's file formats by specifying table schema")) + assert(err1.contains("Create Partitioned Table As Select cannot specify data type for " + + "the partition columns of the target table")) val err2 = intercept[ParseException] { spark.sql( @@ -2393,8 +2393,8 @@ class HiveDDLSuite |AS SELECT 1 as a, "a" as b """.stripMargin) }.getMessage - assert(err2.contains("A Create Table As Select (CTAS) statement is not allowed " + - "to create a partitioned table using Hive's file formats by specifying table schema")) + assert(err2.contains("Create Partitioned Table As Select cannot specify data type for " + + "the partition columns of the target table")) } test("Hive CTAS with dynamic partition") { @@ -2410,15 +2410,8 @@ class HiveDDLSuite """.stripMargin) checkAnswer(spark.table("t"), Row(1, "a")) - assert(sql("DESC t").collect().containsSlice( - Seq( - Row("a", "int", null), - Row("b", "string", null), - Row("# Partition Information", "", ""), - Row("# col_name", "data_type", "comment"), - Row("b", "string", null) - ) - )) + assert(spark.sessionState.catalog.getTableMetadata(TableIdentifier("t")) + .partitionColumnNames === Seq("b")) } } } From 6cd9c2fc45e904ae080ee13a99b69f6d6a0409f2 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Wed, 26 Dec 2018 15:01:39 +0800 Subject: [PATCH 4/5] Checks for data columns. --- .../spark/sql/execution/SparkSqlParser.scala | 16 ++++++++-------- .../spark/sql/hive/execution/HiveDDLSuite.scala | 4 ++-- 2 files changed, 10 insertions(+), 10 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala index f237244895c1..8deb55b00a9d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala @@ -1196,21 +1196,21 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder(conf) { selectQuery match { case Some(q) => + // Don't allow explicit specification of schema for CTAS. + if (dataCols.nonEmpty) { + operationNotAllowed( + "Schema may not be specified in a Create Table As Select (CTAS) statement", + ctx) + } + // When creating partitioned table with CTAS statement, we can't specify data type for the // partition columns. - if (tableDesc.partitionColumnNames.nonEmpty) { + if (partitionCols.nonEmpty) { val errorMessage = "Create Partitioned Table As Select cannot specify data type for " + "the partition columns of the target table." operationNotAllowed(errorMessage, ctx) } - // Don't allow explicit specification of schema for CTAS. - if (schema.nonEmpty) { - operationNotAllowed( - "Schema may not be specified in a Create Table As Select (CTAS) statement", - ctx) - } - // Hive CTAS supports dynamic partition by specifying partition column names. val partitionColumnNames = Option(ctx.partitionColumnNames) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala index 16e5b4e62129..6abdc4054cb0 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala @@ -2381,8 +2381,8 @@ class HiveDDLSuite |AS SELECT 1 as a, "a" as b """.stripMargin) }.getMessage - assert(err1.contains("Create Partitioned Table As Select cannot specify data type for " + - "the partition columns of the target table")) + assert(err1.contains("Schema may not be specified in a Create Table As Select " + + "(CTAS) statement")) val err2 = intercept[ParseException] { spark.sql( From d56a82abe6f123b242e716d797f905ab6ad2454c Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Wed, 26 Dec 2018 22:54:51 +0800 Subject: [PATCH 5/5] Fix old test. --- .../org/apache/spark/sql/hive/execution/SQLQuerySuite.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala index 6acf44606cbb..70efad103d13 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala @@ -692,8 +692,8 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton { |AS SELECT key, value FROM mytable1 """.stripMargin) }.getMessage - assert(e.contains("A Create Table As Select (CTAS) statement is not allowed to " + - "create a partitioned table using Hive's file formats")) + assert(e.contains("Create Partitioned Table As Select cannot specify data type for " + + "the partition columns of the target table")) } } }