Skip to content
Prev Previous commit
Next Next commit
Address reviewer comments.
  • Loading branch information
viirya committed Sep 20, 2016
commit 23ba9a23ab835987ed326a9320cf8632a0783885
Original file line number Diff line number Diff line change
Expand Up @@ -198,12 +198,14 @@ case class DataSource(
}.toArray
val fileCatalog = new ListingFileCatalog(sparkSession, globbedPaths, options, None)
val partitionCols = fileCatalog.partitionSpec().partitionColumns.fields
format.inferSchema(
val inferred = format.inferSchema(
sparkSession,
caseInsensitiveOptions,
fileCatalog.allFiles()).map { inferredSchema =>
partitionCols.foldLeft(inferredSchema)((struct, field) => struct.add(field))
}
fileCatalog.allFiles())

inferred.map { inferredSchema =>
StructType(inferredSchema ++ partitionCols)
}
}.getOrElse {
throw new AnalysisException("Unable to infer schema. It must be specified manually.")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,13 +49,12 @@ class FileStreamSource(
fs.makeQualified(new Path(path)) // can contains glob patterns
}

private val optionsWithPartitionBasePath = if (!SparkHadoopUtil.get.isGlobPath(new Path(path))) {
options.get("path").map { path =>
sourceOptions.optionMapWithoutPath + (("basePath", path))
}.getOrElse(sourceOptions.optionMapWithoutPath)
} else {
sourceOptions.optionMapWithoutPath
}
private val optionsWithPartitionBasePath = sourceOptions.optionMapWithoutPath ++ {
if (!SparkHadoopUtil.get.isGlobPath(new Path(path)) && options.contains("path")) {
Map("basePath" -> path)
} else {
Map()
}}

private val metadataLog = new HDFSMetadataLog[Array[FileEntry]](sparkSession, metadataPath)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -636,6 +636,46 @@ class FileStreamSourceSuite extends FileStreamSourceTest {
}
}

test("when schema inference is turned on, should read partition data") {
def createFile(content: String, src: File, tmp: File): Unit = {
val tempFile = Utils.tempFileWith(new File(tmp, "text"))
val finalFile = new File(src, tempFile.getName)
src.mkdirs()
require(stringToFile(tempFile, content).renameTo(finalFile))
}

withSQLConf(SQLConf.STREAMING_SCHEMA_INFERENCE.key -> "true") {
withTempDirs { case (dir, tmp) =>
val partitionFooSubDir = new File(dir, "partition=foo")
val partitionBarSubDir = new File(dir, "partition=bar")

// Create files in partitions, so we can infer the schema.
createFile("{'value': 'drop0'}", partitionFooSubDir, tmp)
createFile("{'value': 'drop0'}", partitionBarSubDir, tmp)

val fileStream = createFileStream("json", s"${dir.getCanonicalPath}")
val filtered = fileStream.filter($"value" contains "keep")
testStream(filtered)(
// Create new partition=foo sub dir and write to it
AddTextFileData("{'value': 'drop1'}\n{'value': 'keep2'}", partitionFooSubDir, tmp),
CheckAnswer(("keep2", "foo")),

// Append to same partition=1 sub dir
AddTextFileData("{'value': 'keep3'}", partitionFooSubDir, tmp),
CheckAnswer(("keep2", "foo"), ("keep3", "foo")),

// Create new partition sub dir and write to it
AddTextFileData("{'value': 'keep4'}", partitionBarSubDir, tmp),
CheckAnswer(("keep2", "foo"), ("keep3", "foo"), ("keep4", "bar")),

// Append to same partition=2 sub dir
AddTextFileData("{'value': 'keep5'}", partitionBarSubDir, tmp),
CheckAnswer(("keep2", "foo"), ("keep3", "foo"), ("keep4", "bar"), ("keep5", "bar"))
)
}
}
}

test("fault tolerance") {
withTempDirs { case (src, tmp) =>
val fileStream = createFileStream("text", src.getCanonicalPath)
Expand Down