Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
[SPARK-28098][SQL]Support read partitioned Hive tables with (#40)
(cherry picked from commit 984bf78)
  • Loading branch information
catalinii authored and anuvedverma committed Dec 5, 2025
commit 6e63856edefd0674c17e9b0a197ee399032edc5c
Original file line number Diff line number Diff line change
Expand Up @@ -4311,6 +4311,13 @@ object SQLConf {
.booleanConf
.createWithDefault(false)

val READ_PARTITION_WITH_SUBDIRECTORY_ENABLED =
buildConf("spark.sql.sources.readPartitionWithSubdirectory.enabled")
.doc("When set to true, Spark SQL could read the files of " +
" partitioned hive table from subdirectories under root path of table")
.booleanConf
.createWithDefault(true)

val LEGACY_AVRO_ALLOW_INCOMPATIBLE_SCHEMA =
buildConf("spark.sql.legacy.avro.allowIncompatibleSchema")
.internal()
Expand Down Expand Up @@ -5254,6 +5261,9 @@ class SQLConf extends Serializable with Logging with SqlApiConf {

def maxConcurrentOutputFileWriters: Int = getConf(SQLConf.MAX_CONCURRENT_OUTPUT_FILE_WRITERS)

def readPartitionWithSubdirectoryEnabled: Boolean =
getConf(READ_PARTITION_WITH_SUBDIRECTORY_ENABLED)

def plannedWriteEnabled: Boolean = getConf(SQLConf.PLANNED_WRITE_ENABLED)

def inferDictAsStruct: Boolean = getConf(SQLConf.INFER_NESTED_DICT_AS_STRUCT)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,9 @@ class InMemoryFileIndex(
override val rootPaths =
rootPathsSpecified.filterNot(FileStreamSink.ancestorIsMetadataDirectory(_, hadoopConf))

val readPartitionWithSubdirectoryEnabled =
sparkSession.sessionState.conf.readPartitionWithSubdirectoryEnabled

@volatile private var cachedLeafFiles: mutable.LinkedHashMap[Path, FileStatus] = _
@volatile private var cachedLeafDirToChildrenFiles: Map[Path, Array[FileStatus]] = _
@volatile private var cachedPartitionSpec: PartitionSpec = _
Expand Down Expand Up @@ -96,10 +99,25 @@ class InMemoryFileIndex(
val files = listLeafFiles(rootPaths)
cachedLeafFiles =
new mutable.LinkedHashMap[Path, FileStatus]() ++= files.map(f => f.getPath -> f)
cachedLeafDirToChildrenFiles = files.toArray.groupBy(_.getPath.getParent)
cachedLeafDirToChildrenFiles =
if (readPartitionWithSubdirectoryEnabled) {
files.toArray.groupBy(file => getRootPathsLeafDir(file.getPath.getParent, file.getPath))
} else {
files.toArray.groupBy(_.getPath.getParent)
}
cachedPartitionSpec = null
}

private def getRootPathsLeafDir(path: Path, child: Path): Path = {
if (rootPaths.contains(child)) {
path
} else if (rootPaths.contains(path)) {
path
} else {
getRootPathsLeafDir(path.getParent, path)
}
}

override def equals(other: Any): Boolean = other match {
case hdfs: InMemoryFileIndex => rootPaths.toSet == hdfs.rootPaths.toSet
case _ => false
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import com.google.common.util.concurrent.Striped
import org.apache.hadoop.fs.Path

import org.apache.spark.SparkException
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.internal.Logging
import org.apache.spark.sql.{AnalysisException, SparkSession}
import org.apache.spark.sql.catalyst.{QualifiedTableName, TableIdentifier}
Expand Down Expand Up @@ -283,7 +284,7 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log
LogicalRelation(
DataSource(
sparkSession = sparkSession,
paths = rootPath.toString :: Nil,
paths = getDirectoryPathSeq(rootPath),
userSpecifiedSchema = Option(updatedTable.dataSchema),
bucketSpec = hiveBucketSpec,
// Do not interpret the 'path' option at all when tables are read using the Hive
Expand Down Expand Up @@ -321,6 +322,18 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log
result.copy(output = newOutput)
}

private def getDirectoryPathSeq(rootPath: Path): Seq[String] = {
val enableSupportSubDirectories =
sparkSession.sessionState.conf.readPartitionWithSubdirectoryEnabled

if (enableSupportSubDirectories) {
val fs = rootPath.getFileSystem(sparkSession.sessionState.newHadoopConf())
SparkHadoopUtil.get.listLeafDirStatuses(fs, rootPath).map(_.getPath.toString)
} else {
rootPath.toString :: Nil
}
}

private def inferIfNeeded(
relation: HiveTableRelation,
options: Map[String, String],
Expand Down