Skip to content

Commit f89cdec

Browse files
viiryacloud-fan
authored andcommitted
[SPARK-26435][SQL] Support creating partitioned table using Hive CTAS by specifying partition column names
## What changes were proposed in this pull request? Spark SQL doesn't support creating partitioned table using Hive CTAS in SQL syntax. However it is supported by using DataFrameWriter API. ```scala val df = Seq(("a", 1)).toDF("part", "id") df.write.format("hive").partitionBy("part").saveAsTable("t") ``` Hive begins to support this syntax in newer version: https://issues.apache.org/jira/browse/HIVE-20241: ``` CREATE TABLE t PARTITIONED BY (part) AS SELECT 1 as id, "a" as part ``` This patch adds this support to SQL syntax. ## How was this patch tested? Added tests. Closes #23376 from viirya/hive-ctas-partitioned-table. Authored-by: Liang-Chi Hsieh <[email protected]> Signed-off-by: Wenchen Fan <[email protected]>
1 parent 7c7fccf commit f89cdec

File tree

4 files changed

+71
-17
lines changed

4 files changed

+71
-17
lines changed

sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,8 @@ statement
8888
(AS? query)? #createTable
8989
| createTableHeader ('(' columns=colTypeList ')')?
9090
((COMMENT comment=STRING) |
91-
(PARTITIONED BY '(' partitionColumns=colTypeList ')') |
91+
(PARTITIONED BY '(' partitionColumns=colTypeList ')' |
92+
PARTITIONED BY partitionColumnNames=identifierList) |
9293
bucketSpec |
9394
skewSpec |
9495
rowFormat |

sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala

Lines changed: 20 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1196,33 +1196,40 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder(conf) {
11961196

11971197
selectQuery match {
11981198
case Some(q) =>
1199-
// Hive does not allow to use a CTAS statement to create a partitioned table.
1200-
if (tableDesc.partitionColumnNames.nonEmpty) {
1201-
val errorMessage = "A Create Table As Select (CTAS) statement is not allowed to " +
1202-
"create a partitioned table using Hive's file formats. " +
1203-
"Please use the syntax of \"CREATE TABLE tableName USING dataSource " +
1204-
"OPTIONS (...) PARTITIONED BY ...\" to create a partitioned table through a " +
1205-
"CTAS statement."
1206-
operationNotAllowed(errorMessage, ctx)
1207-
}
1208-
12091199
// Don't allow explicit specification of schema for CTAS.
1210-
if (schema.nonEmpty) {
1200+
if (dataCols.nonEmpty) {
12111201
operationNotAllowed(
12121202
"Schema may not be specified in a Create Table As Select (CTAS) statement",
12131203
ctx)
12141204
}
12151205

1206+
// When creating partitioned table with CTAS statement, we can't specify data type for the
1207+
// partition columns.
1208+
if (partitionCols.nonEmpty) {
1209+
val errorMessage = "Create Partitioned Table As Select cannot specify data type for " +
1210+
"the partition columns of the target table."
1211+
operationNotAllowed(errorMessage, ctx)
1212+
}
1213+
1214+
// Hive CTAS supports dynamic partition by specifying partition column names.
1215+
val partitionColumnNames =
1216+
Option(ctx.partitionColumnNames)
1217+
.map(visitIdentifierList(_).toArray)
1218+
.getOrElse(Array.empty[String])
1219+
1220+
val tableDescWithPartitionColNames =
1221+
tableDesc.copy(partitionColumnNames = partitionColumnNames)
1222+
12161223
val hasStorageProperties = (ctx.createFileFormat.size != 0) || (ctx.rowFormat.size != 0)
12171224
if (conf.convertCTAS && !hasStorageProperties) {
12181225
// At here, both rowStorage.serdeProperties and fileStorage.serdeProperties
12191226
// are empty Maps.
1220-
val newTableDesc = tableDesc.copy(
1227+
val newTableDesc = tableDescWithPartitionColNames.copy(
12211228
storage = CatalogStorageFormat.empty.copy(locationUri = locUri),
12221229
provider = Some(conf.defaultDataSourceName))
12231230
CreateTable(newTableDesc, mode, Some(q))
12241231
} else {
1225-
CreateTable(tableDesc, mode, Some(q))
1232+
CreateTable(tableDescWithPartitionColNames, mode, Some(q))
12261233
}
12271234
case None => CreateTable(tableDesc, mode, None)
12281235
}

sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala

Lines changed: 47 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@ package org.apache.spark.sql.hive.execution
1919

2020
import java.io.File
2121
import java.net.URI
22-
import java.util.Date
2322

2423
import scala.language.existentials
2524

@@ -33,6 +32,7 @@ import org.apache.spark.sql.{AnalysisException, QueryTest, Row, SaveMode}
3332
import org.apache.spark.sql.catalyst.TableIdentifier
3433
import org.apache.spark.sql.catalyst.analysis.{NoSuchPartitionException, TableAlreadyExistsException}
3534
import org.apache.spark.sql.catalyst.catalog._
35+
import org.apache.spark.sql.catalyst.parser.ParseException
3636
import org.apache.spark.sql.execution.command.{DDLSuite, DDLUtils}
3737
import org.apache.spark.sql.functions._
3838
import org.apache.spark.sql.hive.HiveExternalCatalog
@@ -2370,4 +2370,50 @@ class HiveDDLSuite
23702370
))
23712371
}
23722372
}
2373+
2374+
test("Hive CTAS can't create partitioned table by specifying schema") {
2375+
val err1 = intercept[ParseException] {
2376+
spark.sql(
2377+
s"""
2378+
|CREATE TABLE t (a int)
2379+
|PARTITIONED BY (b string)
2380+
|STORED AS parquet
2381+
|AS SELECT 1 as a, "a" as b
2382+
""".stripMargin)
2383+
}.getMessage
2384+
assert(err1.contains("Schema may not be specified in a Create Table As Select " +
2385+
"(CTAS) statement"))
2386+
2387+
val err2 = intercept[ParseException] {
2388+
spark.sql(
2389+
s"""
2390+
|CREATE TABLE t
2391+
|PARTITIONED BY (b string)
2392+
|STORED AS parquet
2393+
|AS SELECT 1 as a, "a" as b
2394+
""".stripMargin)
2395+
}.getMessage
2396+
assert(err2.contains("Create Partitioned Table As Select cannot specify data type for " +
2397+
"the partition columns of the target table"))
2398+
}
2399+
2400+
test("Hive CTAS with dynamic partition") {
2401+
Seq("orc", "parquet").foreach { format =>
2402+
withTable("t") {
2403+
withSQLConf("hive.exec.dynamic.partition.mode" -> "nonstrict") {
2404+
spark.sql(
2405+
s"""
2406+
|CREATE TABLE t
2407+
|PARTITIONED BY (b)
2408+
|STORED AS $format
2409+
|AS SELECT 1 as a, "a" as b
2410+
""".stripMargin)
2411+
checkAnswer(spark.table("t"), Row(1, "a"))
2412+
2413+
assert(spark.sessionState.catalog.getTableMetadata(TableIdentifier("t"))
2414+
.partitionColumnNames === Seq("b"))
2415+
}
2416+
}
2417+
}
2418+
}
23732419
}

sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -692,8 +692,8 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
692692
|AS SELECT key, value FROM mytable1
693693
""".stripMargin)
694694
}.getMessage
695-
assert(e.contains("A Create Table As Select (CTAS) statement is not allowed to " +
696-
"create a partitioned table using Hive's file formats"))
695+
assert(e.contains("Create Partitioned Table As Select cannot specify data type for " +
696+
"the partition columns of the target table"))
697697
}
698698
}
699699
}

0 commit comments

Comments
 (0)