Skip to content
Closed
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@ import scala.collection.JavaConverters._
import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer

import org.apache.hadoop.fs.Path
import org.apache.hadoop.hive.common.StatsSetupConst
import org.apache.hadoop.fs.{Path, PathFilter}
import org.apache.hadoop.hive.common.{FileUtils, StatsSetupConst}
import org.apache.hadoop.hive.conf.HiveConf
import org.apache.hadoop.hive.conf.HiveConf.ConfVars
import org.apache.hadoop.hive.metastore.{IMetaStoreClient, TableType => HiveTableType}
Expand Down Expand Up @@ -616,6 +616,23 @@ private[hive] class HiveClientImpl(
shim.createPartitions(client, db, table, parts, ignoreIfExists)
}

val HIDDEN_FILES_PATH_FILTER: PathFilter = (p: Path) => {
val name = p.getName
!name.startsWith("_") && !name.startsWith(".")
}

@throws[Exception]
private def isEmptyPath(dirPath: Path): Boolean = {
val inpFs = dirPath.getFileSystem(conf)
if (inpFs.exists(dirPath)) {
val fStats = inpFs.listStatus(dirPath, HIDDEN_FILES_PATH_FILTER)
if (fStats.nonEmpty) {
return false
}
}
true
}

override def dropPartitions(
db: String,
table: String,
Expand All @@ -638,6 +655,17 @@ private[hive] class HiveClientImpl(
s"No partition is dropped. One partition spec '$s' does not exist in table '$table' " +
s"database '$db'")
}
// Check whether the partition we are going to drop is empty.
// We make a dummy one for the empty partition. See [SPARK-29786] for more details.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this how hive resolve the problem?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this how hive resolve the problem?

Yes, It's the same method as Hive uses.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't it bad for performance? i.e. you call fs.exists and fs.listStatus for each partition.

Copy link
Contributor Author

@Deegue Deegue Mar 17, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't it bad for performance? i.e. you call fs.exists and fs.listStatus for each partition.

Yes, but only affect drop partitions. I think it's necessary and won't take much time to do the check while dropping.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you point to the Hive source code that does the same thing? i.e. create a dummy directory before dropping the partition.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you point to the Hive source code that does the same thing? i.e. create a dummy directory before dropping the partition.

In Hive 1.x, it's like this.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is it for DROP PARTITION?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is it for DROP PARTITION?

No, it will check every query before executing. Maybe it's better to do the check before all queries?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does Spark have a problem to do table scan when partition directory not exist?

Copy link
Contributor Author

@Deegue Deegue Mar 26, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does Spark have a problem to do table scan when partition directory not exist?

It's related to #24668, and controlled by spark.sql.files.ignoreMissingFiles.
Spark will check it when listing leaf files.

parts.foreach { partition =>
val partPath = partition.getPath.head
if (isEmptyPath(partPath)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we need to check non-existing path, not empty path?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we need to check non-existing path, not empty path?

Yes, you're right. We only need to check the existence of path instead of those under the path.

val fs = partPath.getFileSystem(conf)
fs.mkdirs(partPath)
fs.deleteOnExit(partPath)
}
partition
}
parts.map(_.getValues)
}.distinct
var droppedParts = ArrayBuffer.empty[java.util.List[String]]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,11 +114,12 @@ class VersionsSuite extends SparkFunSuite with Logging {

private var versionSpark: TestHiveVersion = null

private val hadoopConf = new Configuration()

versions.foreach { version =>
test(s"$version: create client") {
client = null
System.gc() // Hack to avoid SEGV on some JVM versions.
val hadoopConf = new Configuration()
hadoopConf.set("test", "success")
// Hive changed the default of datanucleus.schema.autoCreateAll from true to false and
// hive.metastore.schema.verification from false to true since 2.0
Expand Down Expand Up @@ -531,6 +532,26 @@ class VersionsSuite extends SparkFunSuite with Logging {
assert(client.getPartitionOption("default", "src_part", spec).isEmpty)
}

test(s"$version: dropPartitions when file not exists") {
val partitions = (1 to testPartitionCount).map { key2 =>
CatalogTablePartition(Map("key1" -> "1", "key2" -> key2.toString), storageFormat)
}
client.createPartitions("default", "src_part", partitions, ignoreIfExists = true)
val spec = Map("key1" -> "1", "key2" -> "2")
val hiveTable = client.getTable("default", "src_part")
val parts = client.getPartitions(hiveTable, Some(spec))
parts.foreach { partition =>
val partPath = new Path(partition.location)
val fs = partPath.getFileSystem(hadoopConf)
if (fs.exists(partPath)) {
fs.delete(partPath, true)
}
}
client.dropPartitions("default", "src_part", Seq(spec), ignoreIfNotExists = false,
purge = false, retainData = false)
assert(client.getPartitionOption("default", "src_part", spec).isEmpty)
}

///////////////////////////////////////////////////////////////////////////
// Function related API
///////////////////////////////////////////////////////////////////////////
Expand Down