Skip to content
Closed
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer

import org.apache.hadoop.fs.Path
import org.apache.hadoop.hive.common.StatsSetupConst
import org.apache.hadoop.hive.common.{FileUtils, StatsSetupConst}
import org.apache.hadoop.hive.conf.HiveConf
import org.apache.hadoop.hive.conf.HiveConf.ConfVars
import org.apache.hadoop.hive.metastore.{IMetaStoreClient, TableType => HiveTableType}
Expand Down Expand Up @@ -616,6 +616,18 @@ private[hive] class HiveClientImpl(
shim.createPartitions(client, db, table, parts, ignoreIfExists)
}

@throws[Exception]
private def isEmptyPath(dirPath: Path): Boolean = {
val inpFs = dirPath.getFileSystem(conf)
if (inpFs.exists(dirPath)) {
val fStats = inpFs.listStatus(dirPath, FileUtils.HIDDEN_FILES_PATH_FILTER)
if (fStats.nonEmpty) {
return false
}
}
true
}

override def dropPartitions(
db: String,
table: String,
Expand All @@ -633,6 +645,17 @@ private[hive] class HiveClientImpl(
// whose specs are supersets of this partial spec. E.g. If a table has partitions
// (b='1', c='1') and (b='1', c='2'), a partial spec of (b='1') will match both.
val parts = client.getPartitions(hiveTable, s.asJava).asScala
// Check whether the partition we are going to drop is empty.
// We make a dummy one for the empty partition. See [SPARK-29786] for more details.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this how hive resolve the problem?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this how hive resolve the problem?

Yes, It's the same method as Hive uses.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't it bad for performance? i.e. you call fs.exists and fs.listStatus for each partition.

Copy link
Contributor Author

@Deegue Deegue Mar 17, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't it bad for performance? i.e. you call fs.exists and fs.listStatus for each partition.

Yes, but only affect drop partitions. I think it's necessary and won't take much time to do the check while dropping.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you point to the Hive source code that does the same thing? i.e. create a dummy directory before dropping the partition.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you point to the Hive source code that does the same thing? i.e. create a dummy directory before dropping the partition.

In Hive 1.x, it's like this.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is it for DROP PARTITION?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is it for DROP PARTITION?

No, it will check every query before executing. Maybe it's better to do the check before all queries?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does Spark have a problem to do table scan when partition directory not exist?

Copy link
Contributor Author

@Deegue Deegue Mar 26, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does Spark have a problem to do table scan when partition directory not exist?

It's related to #24668, and controlled by spark.sql.files.ignoreMissingFiles.
Spark will check it when listing leaf files.

parts.foreach { partition =>
val partPath = partition.getDataLocation
if (isEmptyPath(partPath)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we need to check non-existing path, not empty path?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we need to check non-existing path, not empty path?

Yes, you're right. We only need to check the existence of path instead of those under the path.

val fs = partPath.getFileSystem(conf)
fs.mkdirs(partPath)
fs.deleteOnExit(partPath)
}
partition
}
if (parts.isEmpty && !ignoreIfNotExists) {
throw new AnalysisException(
s"No partition is dropped. One partition spec '$s' does not exist in table '$table' " +
Expand Down