diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala index 12c9a972c1af..2a991acd43f1 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala @@ -27,8 +27,8 @@ import scala.collection.JavaConverters._ import scala.collection.mutable import scala.collection.mutable.ArrayBuffer -import org.apache.hadoop.fs.Path -import org.apache.hadoop.hive.common.StatsSetupConst +import org.apache.hadoop.fs.{Path, PathFilter} +import org.apache.hadoop.hive.common.{FileUtils, StatsSetupConst} import org.apache.hadoop.hive.conf.HiveConf import org.apache.hadoop.hive.conf.HiveConf.ConfVars import org.apache.hadoop.hive.metastore.{IMetaStoreClient, TableType => HiveTableType} @@ -616,6 +616,12 @@ private[hive] class HiveClientImpl( shim.createPartitions(client, db, table, parts, ignoreIfExists) } + @throws[Exception] + private def isExistPath(dirPath: Path): Boolean = { + val inpFs = dirPath.getFileSystem(conf) + inpFs.exists(dirPath) + } + override def dropPartitions( db: String, table: String, @@ -638,6 +644,17 @@ private[hive] class HiveClientImpl( s"No partition is dropped. One partition spec '$s' does not exist in table '$table' " + s"database '$db'") } + // Check whether the partition we are going to drop is empty. + // We make a dummy one for the empty partition. See [SPARK-29786] for more details. + parts.foreach { partition => + val partPath = partition.getPath.head + if (!isExistPath(partPath)) { + val fs = partPath.getFileSystem(conf) + fs.mkdirs(partPath) + fs.deleteOnExit(partPath) + } + partition + } parts.map(_.getValues) }.distinct var droppedParts = ArrayBuffer.empty[java.util.List[String]] diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala index e3797041883a..b7936cbb9c4d 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala @@ -114,11 +114,12 @@ class VersionsSuite extends SparkFunSuite with Logging { private var versionSpark: TestHiveVersion = null + private val hadoopConf = new Configuration() + versions.foreach { version => test(s"$version: create client") { client = null System.gc() // Hack to avoid SEGV on some JVM versions. - val hadoopConf = new Configuration() hadoopConf.set("test", "success") // Hive changed the default of datanucleus.schema.autoCreateAll from true to false and // hive.metastore.schema.verification from false to true since 2.0 @@ -531,6 +532,26 @@ class VersionsSuite extends SparkFunSuite with Logging { assert(client.getPartitionOption("default", "src_part", spec).isEmpty) } + test(s"$version: dropPartitions when file not exists") { + val partitions = (1 to testPartitionCount).map { key2 => + CatalogTablePartition(Map("key1" -> "1", "key2" -> key2.toString), storageFormat) + } + client.createPartitions("default", "src_part", partitions, ignoreIfExists = true) + val spec = Map("key1" -> "1", "key2" -> "2") + val hiveTable = client.getTable("default", "src_part") + val parts = client.getPartitions(hiveTable, Some(spec)) + parts.foreach { partition => + val partPath = new Path(partition.location) + val fs = partPath.getFileSystem(hadoopConf) + if (fs.exists(partPath)) { + fs.delete(partPath, true) + } + } + client.dropPartitions("default", "src_part", Seq(spec), ignoreIfNotExists = false, + purge = false, retainData = false) + assert(client.getPartitionOption("default", "src_part", spec).isEmpty) + } + /////////////////////////////////////////////////////////////////////////// // Function related API ///////////////////////////////////////////////////////////////////////////