Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Fix order
  • Loading branch information
wangyum committed Jul 31, 2018
commit 46bfcc6ebece630a8da3a22240889ac5a8ec9d5f
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,9 @@ case class InsertIntoHadoopFsRelationCommand(
if (updatedPartitionPaths.isEmpty && staticPartitions.nonEmpty
&& partitionColumns.length == staticPartitions.size) {
// Avoid empty static partition can't loaded to datasource table.
refreshUpdatedPartitions(Set(PartitioningUtils.getPathFragment(staticPartitions)))
val staticPathFragment =
PartitioningUtils.getPathFragment(staticPartitions, partitionColumns)
refreshUpdatedPartitions(Set(staticPathFragment))
} else {
refreshUpdatedPartitions(updatedPartitionPaths)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.analysis.{Resolver, TypeCoercion}
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
import org.apache.spark.sql.catalyst.expressions.{Cast, Literal}
import org.apache.spark.sql.catalyst.expressions.{Attribute, Cast, Literal}
import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.sql.types._
import org.apache.spark.sql.util.SchemaUtils
Expand Down Expand Up @@ -284,10 +284,9 @@ object PartitioningUtils {
}.mkString("/")
}

def getPathFragment(partitions: TablePartitionSpec): String = partitions.map {
case (k, v) =>
escapePathName(k) + "=" + escapePathName(v)
}.mkString("/")
def getPathFragment(spec: TablePartitionSpec, partitionColumns: Seq[Attribute]): String = {
getPathFragment(spec, StructType.fromAttributes(partitionColumns))
}

/**
* Normalize the column names in partition specification, w.r.t. the real partition column names
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2254,8 +2254,8 @@ abstract class DDLSuite extends QueryTest with SQLTestUtils {
withTable("t", "t1") {
withTempPath { dir =>
spark.sql("CREATE TABLE t(a int) USING parquet")
spark.sql("CREATE TABLE t1(a int, b string, c string) " +
s"USING parquet PARTITIONED BY(b, c) LOCATION '${dir.toURI}'")
spark.sql("CREATE TABLE t1(a int, c string, b string) " +
s"USING parquet PARTITIONED BY(c, b) LOCATION '${dir.toURI}'")

val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t1"))
assert(table.location == makeQualifiedPath(dir.getAbsolutePath))
Expand All @@ -2266,7 +2266,7 @@ abstract class DDLSuite extends QueryTest with SQLTestUtils {

assert(spark.sql("SHOW PARTITIONS t1").count() == 1)

assert(new File(dir, "b=b/c=c").exists())
assert(new File(dir, "c=c/b=b").exists())

checkAnswer(spark.table("t1"), Nil)
}
Expand All @@ -2277,18 +2277,18 @@ abstract class DDLSuite extends QueryTest with SQLTestUtils {
withTempPath { dir =>
spark.sql("CREATE TABLE t(a int) USING parquet")
spark.sql("CREATE TABLE t1(a int, b string, c string) " +
s"USING parquet PARTITIONED BY(b, c) LOCATION '${dir.toURI}'")
s"USING parquet PARTITIONED BY(c, b) LOCATION '${dir.toURI}'")

val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t1"))
assert(table.location == makeQualifiedPath(dir.getAbsolutePath))

assert(spark.sql("SHOW PARTITIONS t1").count() == 0)

spark.sql("INSERT INTO TABLE t1 PARTITION(b='b', c) SELECT *, 'c' FROM t WHERE 1 = 0")
spark.sql("INSERT INTO TABLE t1 PARTITION(c='c', b) SELECT *, 'b' FROM t WHERE 1 = 0")

assert(spark.sql("SHOW PARTITIONS t1").count() == 0)

assert(!new File(dir, "b=b/c=c").exists())
assert(!new File(dir, "c=c/b=b").exists())

checkAnswer(spark.table("t1"), Nil)
}
Expand Down