diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala index dd7ef0d15c14..5fd0de802f7a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala @@ -166,7 +166,15 @@ case class InsertIntoHadoopFsRelationCommand( // update metastore partition metadata - refreshUpdatedPartitions(updatedPartitionPaths) + if (updatedPartitionPaths.isEmpty && staticPartitions.nonEmpty + && partitionColumns.length == staticPartitions.size) { + // Avoid empty static partition can't loaded to datasource table. + val staticPathFragment = + PartitioningUtils.getPathFragment(staticPartitions, partitionColumns) + refreshUpdatedPartitions(Set(staticPathFragment)) + } else { + refreshUpdatedPartitions(updatedPartitionPaths) + } // refresh cached files in FileIndex fileIndex.foreach(_.refresh()) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala index f9a24806953e..d9a722b74cf5 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala @@ -30,7 +30,7 @@ import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.analysis.{Resolver, TypeCoercion} import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec -import org.apache.spark.sql.catalyst.expressions.{Cast, Literal} +import org.apache.spark.sql.catalyst.expressions.{Attribute, Cast, Literal} import org.apache.spark.sql.catalyst.util.DateTimeUtils import org.apache.spark.sql.types._ import org.apache.spark.sql.util.SchemaUtils @@ -284,6 +284,10 @@ object PartitioningUtils { }.mkString("/") } + def getPathFragment(spec: TablePartitionSpec, partitionColumns: Seq[Attribute]): String = { + getPathFragment(spec, StructType.fromAttributes(partitionColumns)) + } + /** * Normalize the column names in partition specification, w.r.t. the real partition column names * and case sensitivity. e.g., if the partition spec has a column named `monTh`, and there is a diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala index ca95aad3976e..78df1db93692 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala @@ -2249,6 +2249,68 @@ abstract class DDLSuite extends QueryTest with SQLTestUtils { } } + test("Partition table should load empty static partitions") { + // All static partitions + withTable("t", "t1", "t2") { + withTempPath { dir => + spark.sql("CREATE TABLE t(a int) USING parquet") + spark.sql("CREATE TABLE t1(a int, c string, b string) " + + s"USING parquet PARTITIONED BY(c, b) LOCATION '${dir.toURI}'") + + // datasource table + validateStaticPartitionTable("t1") + + // hive table + if (isUsingHiveMetastore) { + spark.sql("CREATE TABLE t2(a int) " + + s"PARTITIONED BY(c string, b string) LOCATION '${dir.toURI}'") + validateStaticPartitionTable("t2") + } + + def validateStaticPartitionTable(tableName: String): Unit = { + val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier(tableName)) + assert(table.location == makeQualifiedPath(dir.getAbsolutePath)) + assert(spark.sql(s"SHOW PARTITIONS $tableName").count() == 0) + spark.sql( + s"INSERT INTO TABLE $tableName PARTITION(b='b', c='c') SELECT * FROM t WHERE 1 = 0") + assert(spark.sql(s"SHOW PARTITIONS $tableName").count() == 1) + assert(new File(dir, "c=c/b=b").exists()) + checkAnswer(spark.table(tableName), Nil) + } + } + } + + // Partial dynamic partitions + withTable("t", "t1", "t2") { + withTempPath { dir => + spark.sql("CREATE TABLE t(a int) USING parquet") + spark.sql("CREATE TABLE t1(a int, b string, c string) " + + s"USING parquet PARTITIONED BY(c, b) LOCATION '${dir.toURI}'") + + // datasource table + validatePartialStaticPartitionTable("t1") + + // hive table + if (isUsingHiveMetastore) { + spark.sql("CREATE TABLE t2(a int) " + + s"PARTITIONED BY(c string, b string) LOCATION '${dir.toURI}'") + validatePartialStaticPartitionTable("t2") + } + + def validatePartialStaticPartitionTable(tableName: String): Unit = { + val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier(tableName)) + assert(table.location == makeQualifiedPath(dir.getAbsolutePath)) + assert(spark.sql(s"SHOW PARTITIONS $tableName").count() == 0) + spark.sql( + s"INSERT INTO TABLE $tableName PARTITION(c='c', b) SELECT *, 'b' FROM t WHERE 1 = 0") + assert(spark.sql(s"SHOW PARTITIONS $tableName").count() == 0) + assert(!new File(dir, "c=c/b=b").exists()) + checkAnswer(spark.table(tableName), Nil) + } + } + } + } + Seq(true, false).foreach { shouldDelete => val tcName = if (shouldDelete) "non-existing" else "existed" test(s"CTAS for external data source table with a $tcName location") {