Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -108,18 +108,21 @@ object ExternalCatalogUtils {
partitionColumnNames: Seq[String],
tablePath: Path): Path = {
val partitionPathStrings = partitionColumnNames.map { col =>
val partitionValue = spec(col)
val partitionString = if (partitionValue == null) {
DEFAULT_PARTITION_NAME
} else {
escapePathName(partitionValue)
}
escapePathName(col) + "=" + partitionString
getPartitionPathString(col, spec(col))
}
partitionPathStrings.foldLeft(tablePath) { (totalPath, nextPartPath) =>
new Path(totalPath, nextPartPath)
}
}

def getPartitionPathString(col: String, value: String): String = {
val partitionString = if (value == null) {
DEFAULT_PARTITION_NAME
} else {
escapePathName(value)
}
escapePathName(col) + "=" + partitionString
}
}

object CatalogUtils {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -892,21 +892,58 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
val hasUpperCasePartitionColumn = partitionColumnNames.exists(col => col.toLowerCase != col)
if (tableMeta.tableType == MANAGED && hasUpperCasePartitionColumn) {
val tablePath = new Path(tableMeta.location)
val fs = tablePath.getFileSystem(hadoopConf)
val newParts = newSpecs.map { spec =>
val rightPath = renamePartitionDirectory(fs, tablePath, partitionColumnNames, spec)
val partition = client.getPartition(db, table, lowerCasePartitionSpec(spec))
val wrongPath = new Path(partition.location)
val rightPath = ExternalCatalogUtils.generatePartitionPath(
spec, partitionColumnNames, tablePath)
partition.copy(storage = partition.storage.copy(locationUri = Some(rightPath.toString)))
}
alterPartitions(db, table, newParts)
}
}

/**
* Rename the partition directory w.r.t. the actual partition columns.
*
* It will recursively rename the partition directory from the first partition column, to be most
* compatible with different file systems. e.g. in some file systems, renaming `a=1/b=2` to
* `A=1/B=2` will result to `a=1/B=2`, while in some other file systems, the renaming works, but
* will leave an empty directory `a=1`.
*/
private def renamePartitionDirectory(
fs: FileSystem,
tablePath: Path,
partCols: Seq[String],
newSpec: TablePartitionSpec): Path = {
import ExternalCatalogUtils.getPartitionPathString

var currentFullPath = tablePath
partCols.foreach { col =>
val partValue = newSpec(col)
val expectedPartitionString = getPartitionPathString(col, partValue)
val expectedPartitionPath = new Path(currentFullPath, expectedPartitionString)

if (fs.exists(expectedPartitionPath)) {
// It is possible that some parental partition directories already exist or doesn't need to
// be renamed. e.g. the partition columns are `a` and `B`, then we don't need to rename
// `/table_path/a=1`. Or we already have a partition directory `A=1/B=2`, and we rename
// another partition to `A=1/B=3`, then we will have `A=1/B=2` and `a=1/b=3`, and we should
// just move `a=1/b=3` into `A=1` with new name `B=3`.
} else {
val actualPartitionString = getPartitionPathString(col.toLowerCase, partValue)
val actualPartitionPath = new Path(currentFullPath, actualPartitionString)
try {
tablePath.getFileSystem(hadoopConf).rename(wrongPath, rightPath)
fs.rename(actualPartitionPath, expectedPartitionPath)
} catch {
case e: IOException => throw new SparkException(
s"Unable to rename partition path from $wrongPath to $rightPath", e)
case e: IOException =>
throw new SparkException("Unable to rename partition path from " +
s"$actualPartitionPath to $expectedPartitionPath", e)
}
partition.copy(storage = partition.storage.copy(locationUri = Some(rightPath.toString)))
}
alterPartitions(db, table, newParts)
currentFullPath = expectedPartitionPath
}

currentFullPath
}

override def alterPartitions(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,11 @@ package org.apache.spark.sql.hive

import java.io.File

import org.apache.hadoop.fs.Path

import org.apache.spark.metrics.source.HiveCatalogMetrics
import org.apache.spark.sql.{AnalysisException, QueryTest}
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.hive.test.TestHiveSingleton
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.SQLTestUtils
Expand Down Expand Up @@ -481,4 +484,30 @@ class PartitionProviderCompatibilitySuite
assert(spark.sql("show partitions test").count() == 5)
}
}

test("SPARK-19359: renaming partition should not leave useless directories") {
withTable("t", "t1") {
Seq((1, 2, 3)).toDF("id", "A", "B").write.partitionBy("A", "B").saveAsTable("t")
spark.sql("alter table t partition(A=2, B=3) rename to partition(A=4, B=5)")

var table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t"))
var tablePath = new Path(table.location)
val fs = tablePath.getFileSystem(spark.sessionState.newHadoopConf())
// the `A=2` directory is still there, we follow this behavior from hive.
assert(fs.listStatus(tablePath)
.filterNot(_.getPath.toString.contains("A=2")).count(_.isDirectory) == 1)
assert(fs.listStatus(new Path(tablePath, "A=4")).count(_.isDirectory) == 1)


Seq((1, 2, 3, 4)).toDF("id", "A", "b", "C").write.partitionBy("A", "b", "C").saveAsTable("t1")
spark.sql("alter table t1 partition(A=2, b=3, C=4) rename to partition(A=4, b=5, C=6)")
table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t1"))
tablePath = new Path(table.location)
// the `A=2` directory is still there, we follow this behavior from hive.
assert(fs.listStatus(tablePath)
.filterNot(_.getPath.toString.contains("A=2")).count(_.isDirectory) == 1)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If going to check A=2 directory exist, I think here is filter?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wanna check the number of partition directories except A=2.

assert(fs.listStatus(new Path(tablePath, "A=4")).count(_.isDirectory) == 1)
assert(fs.listStatus(new Path(new Path(tablePath, "A=4"), "b=5")).count(_.isDirectory) == 1)
}
}
}