Skip to content
Prev Previous commit
Next Next commit
best refactor
  • Loading branch information
gengliangwang committed Apr 12, 2018
commit 9a2af2dbbebb5f5f37ee5ee856c7dcf6a71b45aa
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,14 @@ case class DataSource(
lazy val sourceInfo: SourceInfo = sourceSchema()
private val caseInsensitiveOptions = CaseInsensitiveMap(options)
private val equality = sparkSession.sessionState.conf.resolver
// The operations below are expensive therefore try not to do them if we don't need to, e.g.,
// in streaming mode, we have already inferred and registered partition columns, we will
// never have to materialize the lazy val below
private lazy val tempFileIndex = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's only used once, no need to be a lazy val, we can just inline it.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I moved it here on purpose. So it may be avoid being created twice in the future.
I am OK to inline it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's just inline it. People can still create a new index in the future, technically this can't prevent users from doing that.

val globbedPaths =
checkAndGlobPathIfNecessary(checkEmptyGlobPath = false, checkFilesExist = false)
createInMemoryFileIndex(globbedPaths)
}

bucketSpec.map { bucket =>
SchemaUtils.checkColumnNameDuplication(
Expand Down Expand Up @@ -122,32 +130,29 @@ case class DataSource(
* be any further inference in any triggers.
*
* @param format the file format object for this DataSource
* @param fileIndex optional [[InMemoryFileIndex]] for getting partition schema and file list
* @param optionalFileIndex optional [[FileIndex]] for getting partition schema and file list
* @return A pair of the data schema (excluding partition columns) and the schema of the partition
* columns.
*/
private def getOrInferFileFormatSchema(
format: FileFormat,
fileIndex: Option[InMemoryFileIndex] = None): (StructType, StructType) = {
// the operations below are expensive therefore try not to do them if we don't need to, e.g.,
// in streaming mode, we have already inferred and registered partition columns, we will
// never have to materialize the lazy val below
lazy val tempFileIndex = fileIndex.getOrElse(
createInMemoryFileIndex(withFileStatusCache = false))
optionalFileIndex: Option[FileIndex] = None): (StructType, StructType) = {
def fileIndex = optionalFileIndex.getOrElse(tempFileIndex)

val partitionSchema = if (partitionColumns.isEmpty) {
// Try to infer partitioning, because no DataSource in the read path provides the partitioning
// columns properly unless it is a Hive DataSource
tempFileIndex.partitionSchema
fileIndex.partitionSchema
} else {
// maintain old behavior before SPARK-18510. If userSpecifiedSchema is empty used inferred
// partitioning
if (userSpecifiedSchema.isEmpty) {
val inferredPartitions = tempFileIndex.partitionSchema
val inferredPartitions = fileIndex.partitionSchema
inferredPartitions
} else {
val partitionFields = partitionColumns.map { partitionColumn =>
userSpecifiedSchema.flatMap(_.find(c => equality(c.name, partitionColumn))).orElse {
val inferredPartitions = tempFileIndex.partitionSchema
val inferredPartitions = fileIndex.partitionSchema
val inferredOpt = inferredPartitions.find(p => equality(p.name, partitionColumn))
if (inferredOpt.isDefined) {
logDebug(
Expand All @@ -173,10 +178,14 @@ case class DataSource(
val dataSchema = userSpecifiedSchema.map { schema =>
StructType(schema.filterNot(f => partitionSchema.exists(p => equality(p.name, f.name))))
}.orElse {
val index = fileIndex match {
case i: InMemoryFileIndex => i
case _ => tempFileIndex
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why?

format.inferSchema(
sparkSession,
caseInsensitiveOptions,
tempFileIndex.allFiles())
index.allFiles())
}.getOrElse {
throw new AnalysisException(
s"Unable to infer schema for $format. It must be specified manually.")
Expand All @@ -198,25 +207,6 @@ case class DataSource(
(dataSchema, partitionSchema)
}

/** Returns an [[InMemoryFileIndex]] that can be used to get partition schema and file list. */
private def createInMemoryFileIndex(
withFileStatusCache: Boolean,
checkEmptyGlobPath: Boolean = false,
checkFilesExist: Boolean = false): InMemoryFileIndex = {
val allPaths = caseInsensitiveOptions.get("path") ++ paths
val hadoopConf = sparkSession.sessionState.newHadoopConf()
val globbedPaths = allPaths.flatMap(
DataSource.checkAndGlobPathIfNecessary(
hadoopConf, _, checkEmptyGlobPath, checkFilesExist)).toArray
val fileStatusCache = if (withFileStatusCache) {
FileStatusCache.getOrCreate(sparkSession)
} else {
NoopCache
}
new InMemoryFileIndex(
sparkSession, globbedPaths, options, userSpecifiedSchema, fileStatusCache)
}

/** Returns the name and schema of the source that can be used to continually read data. */
private def sourceSchema(): SourceInfo = {
providingClass.newInstance() match {
Expand Down Expand Up @@ -370,11 +360,8 @@ case class DataSource(

// This is a non-streaming file based datasource.
case (format: FileFormat, _) =>
val inMemoryFileIndex = createInMemoryFileIndex(withFileStatusCache = true,
checkEmptyGlobPath = true, checkFilesExist = checkFilesExist)
val (dataSchema, partitionSchema) =
getOrInferFileFormatSchema(format, Some(inMemoryFileIndex))

val globbedPaths =
checkAndGlobPathIfNecessary(checkEmptyGlobPath = true, checkFilesExist = checkFilesExist)
val fileCatalog = if (sparkSession.sqlContext.conf.manageFilesourcePartitions &&
catalogTable.isDefined && catalogTable.get.tracksPartitionsInCatalog) {
val defaultTableSize = sparkSession.sessionState.conf.defaultSizeInBytes
Expand All @@ -383,8 +370,10 @@ case class DataSource(
catalogTable.get,
catalogTable.get.stats.map(_.sizeInBytes.toLong).getOrElse(defaultTableSize))
} else {
inMemoryFileIndex
createInMemoryFileIndex(globbedPaths)
}
val (dataSchema, partitionSchema) =
getOrInferFileFormatSchema(format, Some(fileCatalog))

HadoopFsRelation(
fileCatalog,
Expand Down Expand Up @@ -534,6 +523,40 @@ case class DataSource(
sys.error(s"${providingClass.getCanonicalName} does not allow create table as select.")
}
}

/** Returns an [[InMemoryFileIndex]] that can be used to get partition schema and file list. */
private def createInMemoryFileIndex(globbedPaths: Seq[Path]): InMemoryFileIndex = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this can be def createInMemoryFileIndex(checkEmptyGlobPath: Boolean)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

and we can merge checkAndGlobPathIfNecessary and createInMemoryFileIndex

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, we can't. In some case we need to check the glob files, while we don't need to create InMemoryFileIndex

val fileStatusCache = FileStatusCache.getOrCreate(sparkSession)
new InMemoryFileIndex(
sparkSession, globbedPaths, options, userSpecifiedSchema, fileStatusCache)
}

/**
* Checks and returns files in all the paths.
*/
private def checkAndGlobPathIfNecessary(
checkEmptyGlobPath: Boolean,
checkFilesExist: Boolean): Seq[Path] = {
val allPaths = caseInsensitiveOptions.get("path") ++ paths
val hadoopConf = sparkSession.sessionState.newHadoopConf()
allPaths.flatMap { path =>
val hdfsPath = new Path(path)
val fs = hdfsPath.getFileSystem(hadoopConf)
val qualified = hdfsPath.makeQualified(fs.getUri, fs.getWorkingDirectory)
val globPath = SparkHadoopUtil.get.globPathIfNecessary(fs, qualified)

if (checkEmptyGlobPath && globPath.isEmpty) {
throw new AnalysisException(s"Path does not exist: $qualified")
}

// Sufficient to check head of the globPath seq for non-glob scenario
// Don't need to check once again if files exist in streaming mode
if (checkFilesExist && !fs.exists(globPath.head)) {
throw new AnalysisException(s"Path does not exist: ${globPath.head}")
}
globPath
}.toSeq
}
}

object DataSource extends Logging {
Expand Down Expand Up @@ -681,31 +704,6 @@ object DataSource extends Logging {
locationUri = path.map(CatalogUtils.stringToURI), properties = optionsWithoutPath)
}

/**
* If `path` is a file pattern, return all the files that match it. Otherwise, return itself.
* If `checkFilesExist` is `true`, also check the file existence.
*/
private def checkAndGlobPathIfNecessary(
hadoopConf: Configuration,
path: String,
checkEmptyGlobPath: Boolean,
checkFilesExist: Boolean): Seq[Path] = {
val hdfsPath = new Path(path)
val fs = hdfsPath.getFileSystem(hadoopConf)
val qualified = hdfsPath.makeQualified(fs.getUri, fs.getWorkingDirectory)
val globPath = SparkHadoopUtil.get.globPathIfNecessary(fs, qualified)

if (checkEmptyGlobPath && globPath.isEmpty) {
throw new AnalysisException(s"Path does not exist: $qualified")
}
// Sufficient to check head of the globPath seq for non-glob scenario
// Don't need to check once again if files exist in streaming mode
if (checkFilesExist && !fs.exists(globPath.head)) {
throw new AnalysisException(s"Path does not exist: ${globPath.head}")
}
globPath
}

/**
* Called before writing into a FileFormat based data source to make sure the
* supplied schema is not empty.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -419,7 +419,7 @@ class PartitionedTablePerfStatsSuite
HiveCatalogMetrics.reset()
spark.read.load(dir.getAbsolutePath)
assert(HiveCatalogMetrics.METRIC_FILES_DISCOVERED.getCount() == 1)
assert(HiveCatalogMetrics.METRIC_FILE_CACHE_HITS.getCount() == 1)
assert(HiveCatalogMetrics.METRIC_FILE_CACHE_HITS.getCount() == 0)
}
}
}