Skip to content
Prev Previous commit
Next Next commit
use the old way
  • Loading branch information
gengliangwang committed Apr 12, 2018
commit 00438cdb756e47ea9575a29d8e44e09550df5eaa
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import scala.collection.JavaConverters._
import scala.language.{existentials, implicitConversions}
import scala.util.{Failure, Success, Try}

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path

import org.apache.spark.deploy.SparkHadoopUtil
Expand Down Expand Up @@ -121,24 +122,32 @@ case class DataSource(
* be any further inference in any triggers.
*
* @param format the file format object for this DataSource
* @param fileIndex optional [[InMemoryFileIndex]] for getting partition schema and file list
* @return A pair of the data schema (excluding partition columns) and the schema of the partition
* columns.
*/
private def getOrInferFileFormatSchema(format: FileFormat): (StructType, StructType) = {
private def getOrInferFileFormatSchema(
format: FileFormat,
fileIndex: Option[InMemoryFileIndex] = None): (StructType, StructType) = {
// the operations below are expensive therefore try not to do them if we don't need to, e.g.,
// in streaming mode, we have already inferred and registered partition columns, we will
// never have to materialize the lazy val below
lazy val tempFileIndex = fileIndex.getOrElse(
createInMemoryFileIndex(withFileStatusCache = false))
val partitionSchema = if (partitionColumns.isEmpty) {
// Try to infer partitioning, because no DataSource in the read path provides the partitioning
// columns properly unless it is a Hive DataSource
inMemoryFileIndex.partitionSchema
tempFileIndex.partitionSchema
} else {
// maintain old behavior before SPARK-18510. If userSpecifiedSchema is empty used inferred
// partitioning
if (userSpecifiedSchema.isEmpty) {
val inferredPartitions = inMemoryFileIndex.partitionSchema
val inferredPartitions = tempFileIndex.partitionSchema
inferredPartitions
} else {
val partitionFields = partitionColumns.map { partitionColumn =>
userSpecifiedSchema.flatMap(_.find(c => equality(c.name, partitionColumn))).orElse {
val inferredPartitions = inMemoryFileIndex.partitionSchema
val inferredPartitions = tempFileIndex.partitionSchema
val inferredOpt = inferredPartitions.find(p => equality(p.name, partitionColumn))
if (inferredOpt.isDefined) {
logDebug(
Expand Down Expand Up @@ -167,7 +176,7 @@ case class DataSource(
format.inferSchema(
sparkSession,
caseInsensitiveOptions,
inMemoryFileIndex.allFiles())
tempFileIndex.allFiles())
}.getOrElse {
throw new AnalysisException(
s"Unable to infer schema for $format. It must be specified manually.")
Expand All @@ -189,16 +198,21 @@ case class DataSource(
(dataSchema, partitionSchema)
}

/**
* An [[InMemoryFileIndex]] that can be used to get partition schema and file list.
* The operations below are expensive therefore try not to do them if we don't need to, e.g.,
* in streaming mode, we have already inferred and registered partition columns, we will
* never have to materialize the lazy val below
*/
private lazy val inMemoryFileIndex: InMemoryFileIndex = {
val globbedPaths =
checkAndGlobPathIfNecessary(checkEmptyGlobPath = false, checkFilesExist = false)
val fileStatusCache = FileStatusCache.getOrCreate(sparkSession)
/** Returns an [[InMemoryFileIndex]] that can be used to get partition schema and file list. */
private def createInMemoryFileIndex(
withFileStatusCache: Boolean,
checkEmptyGlobPath: Boolean = false,
checkFilesExist: Boolean = false): InMemoryFileIndex = {
val allPaths = caseInsensitiveOptions.get("path") ++ paths
val hadoopConf = sparkSession.sessionState.newHadoopConf()
val globbedPaths = allPaths.flatMap(
DataSource.checkAndGlobPathIfNecessary(
hadoopConf, _, checkEmptyGlobPath, checkFilesExist)).toArray
val fileStatusCache = if (withFileStatusCache) {
FileStatusCache.getOrCreate(sparkSession)
} else {
NoopCache
}
new InMemoryFileIndex(
sparkSession, globbedPaths, options, userSpecifiedSchema, fileStatusCache)
}
Expand Down Expand Up @@ -356,8 +370,10 @@ case class DataSource(

// This is a non-streaming file based datasource.
case (format: FileFormat, _) =>
checkAndGlobPathIfNecessary(checkEmptyGlobPath = true, checkFilesExist = checkFilesExist)
val (dataSchema, partitionSchema) = getOrInferFileFormatSchema(format)
val inMemoryFileIndex = createInMemoryFileIndex(withFileStatusCache = true,
checkEmptyGlobPath = true, checkFilesExist = checkFilesExist)
val (dataSchema, partitionSchema) =
getOrInferFileFormatSchema(format, Some(inMemoryFileIndex))

val fileCatalog = if (sparkSession.sqlContext.conf.manageFilesourcePartitions &&
catalogTable.isDefined && catalogTable.get.tracksPartitionsInCatalog) {
Expand Down Expand Up @@ -518,33 +534,6 @@ case class DataSource(
sys.error(s"${providingClass.getCanonicalName} does not allow create table as select.")
}
}

/**
* Checks and returns files in all the paths.
*/
private def checkAndGlobPathIfNecessary(
checkEmptyGlobPath: Boolean,
checkFilesExist: Boolean): Seq[Path] = {
val allPaths = caseInsensitiveOptions.get("path") ++ paths
val hadoopConf = sparkSession.sessionState.newHadoopConf()
allPaths.flatMap { path =>
val hdfsPath = new Path(path)
val fs = hdfsPath.getFileSystem(hadoopConf)
val qualified = hdfsPath.makeQualified(fs.getUri, fs.getWorkingDirectory)
val globPath = SparkHadoopUtil.get.globPathIfNecessary(fs, qualified)

if (checkEmptyGlobPath && globPath.isEmpty) {
throw new AnalysisException(s"Path does not exist: $qualified")
}

// Sufficient to check head of the globPath seq for non-glob scenario
// Don't need to check once again if files exist in streaming mode
if (checkFilesExist && !fs.exists(globPath.head)) {
throw new AnalysisException(s"Path does not exist: ${globPath.head}")
}
globPath
}.toSeq
}
}

object DataSource extends Logging {
Expand Down Expand Up @@ -692,6 +681,31 @@ object DataSource extends Logging {
locationUri = path.map(CatalogUtils.stringToURI), properties = optionsWithoutPath)
}

/**
* If `path` is a file pattern, return all the files that match it. Otherwise, return itself.
* If `checkFilesExist` is `true`, also check the file existence.
*/
private def checkAndGlobPathIfNecessary(
hadoopConf: Configuration,
path: String,
checkEmptyGlobPath: Boolean,
checkFilesExist: Boolean): Seq[Path] = {
val hdfsPath = new Path(path)
val fs = hdfsPath.getFileSystem(hadoopConf)
val qualified = hdfsPath.makeQualified(fs.getUri, fs.getWorkingDirectory)
val globPath = SparkHadoopUtil.get.globPathIfNecessary(fs, qualified)

if (checkEmptyGlobPath && globPath.isEmpty) {
throw new AnalysisException(s"Path does not exist: $qualified")
}
// Sufficient to check head of the globPath seq for non-glob scenario
// Don't need to check once again if files exist in streaming mode
if (checkFilesExist && !fs.exists(globPath.head)) {
throw new AnalysisException(s"Path does not exist: ${globPath.head}")
}
globPath
}

/**
* Called before writing into a FileFormat based data source to make sure the
* supplied schema is not empty.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,9 +139,12 @@ abstract class PartitioningAwareFileIndex(
// we need to cast into the data type that user specified.
def castPartitionValuesToUserSchema(row: InternalRow) = {
InternalRow((0 until row.numFields).map { i =>
val dt = inferredPartitionSpec.partitionColumns.fields(i).dataType
val expr = inferredPartitionSpec.partitionColumns.fields(i).dataType match {
case StringType => Literal.create(row.getUTF8String(i), StringType)
case otherType => Literal.create(row.get(i, otherType))
}
Cast(
Literal.create(row.get(i, dt), dt),
expr,
userPartitionSchema.fields(i).dataType,
Option(timeZoneId)).eval()
}: _*)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -419,7 +419,7 @@ class PartitionedTablePerfStatsSuite
HiveCatalogMetrics.reset()
spark.read.load(dir.getAbsolutePath)
assert(HiveCatalogMetrics.METRIC_FILES_DISCOVERED.getCount() == 1)
assert(HiveCatalogMetrics.METRIC_FILE_CACHE_HITS.getCount() == 0)
assert(HiveCatalogMetrics.METRIC_FILE_CACHE_HITS.getCount() == 1)
}
}
}