Skip to content
Prev Previous commit
Next Next commit
make it simple
  • Loading branch information
gengliangwang committed Apr 12, 2018
commit 378d0ccc37c9a10637a265581d55b6219fda6cc8
Original file line number Diff line number Diff line change
Expand Up @@ -126,28 +126,25 @@ case class DataSource(
* @return A pair of the data schema (excluding partition columns) and the schema of the partition
* columns.
*/
private def getOrInferFileFormatSchema(
format: FileFormat,
fileIndex: Option[InMemoryFileIndex] = None): (StructType, StructType) = {
private def getOrInferFileFormatSchema(format: FileFormat): (StructType, StructType) = {
// the operations below are expensive therefore try not to do them if we don't need to, e.g.,
// in streaming mode, we have already inferred and registered partition columns, we will
// never have to materialize the lazy val below
lazy val tempFileIndex = fileIndex.getOrElse(
createInMemoryFileIndex(withFileStatusCache = false))

val partitionSchema = if (partitionColumns.isEmpty) {
// Try to infer partitioning, because no DataSource in the read path provides the partitioning
// columns properly unless it is a Hive DataSource
tempFileIndex.partitionSchema
inMemoryFileIndex.partitionSchema
} else {
// maintain old behavior before SPARK-18510. If userSpecifiedSchema is empty used inferred
// partitioning
if (userSpecifiedSchema.isEmpty) {
val inferredPartitions = tempFileIndex.partitionSchema
val inferredPartitions = inMemoryFileIndex.partitionSchema
inferredPartitions
} else {
val partitionFields = partitionColumns.map { partitionColumn =>
userSpecifiedSchema.flatMap(_.find(c => equality(c.name, partitionColumn))).orElse {
val inferredPartitions = tempFileIndex.partitionSchema
val inferredPartitions = inMemoryFileIndex.partitionSchema
val inferredOpt = inferredPartitions.find(p => equality(p.name, partitionColumn))
if (inferredOpt.isDefined) {
logDebug(
Expand Down Expand Up @@ -176,7 +173,7 @@ case class DataSource(
format.inferSchema(
sparkSession,
caseInsensitiveOptions,
tempFileIndex.allFiles())
inMemoryFileIndex.allFiles())
}.getOrElse {
throw new AnalysisException(
s"Unable to infer schema for $format. It must be specified manually.")
Expand All @@ -198,21 +195,11 @@ case class DataSource(
(dataSchema, partitionSchema)
}

/** Returns an [[InMemoryFileIndex]] that can be used to get partition schema and file list. */
private def createInMemoryFileIndex(
withFileStatusCache: Boolean,
checkEmptyGlobPath: Boolean = false,
checkFilesExist: Boolean = false): InMemoryFileIndex = {
val allPaths = caseInsensitiveOptions.get("path") ++ paths
val hadoopConf = sparkSession.sessionState.newHadoopConf()
val globbedPaths = allPaths.flatMap(
DataSource.checkAndGlobPathIfNecessary(
hadoopConf, _, checkEmptyGlobPath, checkFilesExist)).toArray
val fileStatusCache = if (withFileStatusCache) {
FileStatusCache.getOrCreate(sparkSession)
} else {
NoopCache
}
/** An [[InMemoryFileIndex]] that can be used to get partition schema and file list. */
private lazy val inMemoryFileIndex: InMemoryFileIndex = {
val globbedPaths =
checkAndGlobPathIfNecessary(checkEmptyGlobPath = false, checkFilesExist = false)
val fileStatusCache = FileStatusCache.getOrCreate(sparkSession)
new InMemoryFileIndex(
sparkSession, globbedPaths, options, userSpecifiedSchema, fileStatusCache)
}
Expand Down Expand Up @@ -370,10 +357,9 @@ case class DataSource(

// This is a non-streaming file based datasource.
case (format: FileFormat, _) =>
val inMemoryFileIndex = createInMemoryFileIndex(withFileStatusCache = true,
checkEmptyGlobPath = true, checkFilesExist = checkFilesExist)
checkAndGlobPathIfNecessary(checkEmptyGlobPath = true, checkFilesExist = checkFilesExist)
val (dataSchema, partitionSchema) =
getOrInferFileFormatSchema(format, Some(inMemoryFileIndex))
getOrInferFileFormatSchema(format)

val fileCatalog = if (sparkSession.sqlContext.conf.manageFilesourcePartitions &&
catalogTable.isDefined && catalogTable.get.tracksPartitionsInCatalog) {
Expand Down Expand Up @@ -534,6 +520,33 @@ case class DataSource(
sys.error(s"${providingClass.getCanonicalName} does not allow create table as select.")
}
}

/**
* Checks and returns files in all the paths.
*/
private def checkAndGlobPathIfNecessary(
checkEmptyGlobPath: Boolean,
checkFilesExist: Boolean): Seq[Path] = {
val allPaths = caseInsensitiveOptions.get("path") ++ paths
val hadoopConf = sparkSession.sessionState.newHadoopConf()
allPaths.flatMap { path =>
val hdfsPath = new Path(path)
val fs = hdfsPath.getFileSystem(hadoopConf)
val qualified = hdfsPath.makeQualified(fs.getUri, fs.getWorkingDirectory)
val globPath = SparkHadoopUtil.get.globPathIfNecessary(fs, qualified)

if (checkEmptyGlobPath && globPath.isEmpty) {
throw new AnalysisException(s"Path does not exist: $qualified")
}

// Sufficient to check head of the globPath seq for non-glob scenario
// Don't need to check once again if files exist in streaming mode
if (checkFilesExist && !fs.exists(globPath.head)) {
throw new AnalysisException(s"Path does not exist: ${globPath.head}")
}
globPath
}.toSeq
}
}

object DataSource extends Logging {
Expand Down Expand Up @@ -681,31 +694,6 @@ object DataSource extends Logging {
locationUri = path.map(CatalogUtils.stringToURI), properties = optionsWithoutPath)
}

/**
* If `path` is a file pattern, return all the files that match it. Otherwise, return itself.
* If `checkFilesExist` is `true`, also check the file existence.
*/
private def checkAndGlobPathIfNecessary(
hadoopConf: Configuration,
path: String,
checkEmptyGlobPath: Boolean,
checkFilesExist: Boolean): Seq[Path] = {
val hdfsPath = new Path(path)
val fs = hdfsPath.getFileSystem(hadoopConf)
val qualified = hdfsPath.makeQualified(fs.getUri, fs.getWorkingDirectory)
val globPath = SparkHadoopUtil.get.globPathIfNecessary(fs, qualified)

if (checkEmptyGlobPath && globPath.isEmpty) {
throw new AnalysisException(s"Path does not exist: $qualified")
}
// Sufficient to check head of the globPath seq for non-glob scenario
// Don't need to check once again if files exist in streaming mode
if (checkFilesExist && !fs.exists(globPath.head)) {
throw new AnalysisException(s"Path does not exist: ${globPath.head}")
}
globPath
}

/**
* Called before writing into a FileFormat based data source to make sure the
* supplied schema is not empty.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ class PartitionProviderCompatibilitySuite
HiveCatalogMetrics.reset()
assert(spark.sql("select * from test where partCol < 2").count() == 2)
assert(HiveCatalogMetrics.METRIC_PARTITIONS_FETCHED.getCount() == 2)
assert(HiveCatalogMetrics.METRIC_FILES_DISCOVERED.getCount() == 7)
assert(HiveCatalogMetrics.METRIC_FILES_DISCOVERED.getCount() == 2)
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ class PartitionedTablePerfStatsSuite
HiveCatalogMetrics.reset()
spark.sql("select * from test where partCol1 = 999").count()
assert(HiveCatalogMetrics.METRIC_PARTITIONS_FETCHED.getCount() == 0)
checkFilesDiscovered(spec.isDatasourceTable, 0)
assert(HiveCatalogMetrics.METRIC_FILES_DISCOVERED.getCount() == 0)

HiveCatalogMetrics.reset()
spark.sql("select * from test where partCol1 < 2").count()
Expand Down Expand Up @@ -188,7 +188,7 @@ class PartitionedTablePerfStatsSuite
HiveCatalogMetrics.reset()
assert(spark.sql("select * from test where partCol1 = 999").count() == 0)
assert(HiveCatalogMetrics.METRIC_PARTITIONS_FETCHED.getCount() == 0)
checkFilesDiscovered(spec.isDatasourceTable, 0)
assert(HiveCatalogMetrics.METRIC_FILES_DISCOVERED.getCount() == 0)
assert(HiveCatalogMetrics.METRIC_FILE_CACHE_HITS.getCount() == 0)

HiveCatalogMetrics.reset()
Expand Down Expand Up @@ -228,13 +228,13 @@ class PartitionedTablePerfStatsSuite
spec.setupTable("test", dir)
HiveCatalogMetrics.reset()
assert(spark.sql("select * from test").count() == 5)
checkFilesDiscovered(spec.isDatasourceTable, 5)
assert(HiveCatalogMetrics.METRIC_FILES_DISCOVERED.getCount() == 5)
assert(HiveCatalogMetrics.METRIC_FILE_CACHE_HITS.getCount() == 0)

HiveCatalogMetrics.reset()
spark.sql("refresh table test")
assert(spark.sql("select * from test").count() == 5)
checkFilesDiscovered(spec.isDatasourceTable, 5)
assert(HiveCatalogMetrics.METRIC_FILES_DISCOVERED.getCount() == 5)
assert(HiveCatalogMetrics.METRIC_FILE_CACHE_HITS.getCount() == 0)

spark.catalog.cacheTable("test")
Expand All @@ -257,10 +257,10 @@ class PartitionedTablePerfStatsSuite
spec.setupTable("test", dir)
HiveCatalogMetrics.reset()
assert(spark.sql("select * from test").count() == 5)
checkFilesDiscovered(spec.isDatasourceTable, 5)
assert(HiveCatalogMetrics.METRIC_FILES_DISCOVERED.getCount() == 5)
assert(HiveCatalogMetrics.METRIC_FILE_CACHE_HITS.getCount() == 0)
assert(spark.sql("select * from test").count() == 5)
checkFilesDiscovered(spec.isDatasourceTable, 10)
assert(HiveCatalogMetrics.METRIC_FILES_DISCOVERED.getCount() == 10)
assert(HiveCatalogMetrics.METRIC_FILE_CACHE_HITS.getCount() == 0)
}
}
Expand All @@ -273,7 +273,7 @@ class PartitionedTablePerfStatsSuite
withTempDir { dir =>
HiveCatalogMetrics.reset()
setupPartitionedDatasourceTable("test", dir, scale = 10, repair = false)
assert(HiveCatalogMetrics.METRIC_FILES_DISCOVERED.getCount() == 10)
assert(HiveCatalogMetrics.METRIC_FILES_DISCOVERED.getCount() == 0)
assert(HiveCatalogMetrics.METRIC_FILE_CACHE_HITS.getCount() == 0)
}
}
Expand Down