Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -129,13 +129,20 @@ class FileStreamSource(
val files = metadataLog.get(Some(startId + 1), Some(endId)).flatMap(_._2)
logInfo(s"Processing ${files.length} files from ${startId + 1}:$endId")
logTrace(s"Files are:\n\t" + files.mkString("\n\t"))
val newOptions = if (!SparkHadoopUtil.get.isGlobPath(new Path(path))) {
Copy link
Contributor

@frreiss frreiss Aug 29, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I recommend putting this check for globs into the initialization code at the top of this file that sets qualifiedBasePath (currently lines 47-50). That way all the code that interprets the meaning of the path parameter will be in one place.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok.

options.get("path").map { path =>
sourceOptions.optionMapWithoutPath + (("basePath", path))
}.getOrElse(sourceOptions.optionMapWithoutPath)
} else {
sourceOptions.optionMapWithoutPath
}
val newDataSource =
DataSource(
sparkSession,
paths = files.map(_.path),
userSpecifiedSchema = Some(schema),
className = fileFormatClassName,
options = sourceOptions.optionMapWithoutPath)
options = newOptions)
Dataset.ofRows(sparkSession, LogicalRelation(newDataSource.resolveRelation()))
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -601,6 +601,34 @@ class FileStreamSourceSuite extends FileStreamSourceTest {

// =============== other tests ================

test("read new files in partitioned table without globbing, should read partition data") {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should probably also have an explicit test for the case where schema inference is turned on (you implicitly test it some with the code changed below)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a test for it.

withTempDirs { case (dir, tmp) =>
val partitionFooSubDir = new File(dir, "partition=foo")
val partitionBarSubDir = new File(dir, "partition=bar")

val schema = new StructType().add("value", StringType).add("partition", StringType)
val fileStream = createFileStream("json", s"${dir.getCanonicalPath}", Some(schema))
val filtered = fileStream.filter($"value" contains "keep")
testStream(filtered)(
// Create new partition=foo sub dir and write to it
AddTextFileData("{'value': 'drop1'}\n{'value': 'keep2'}", partitionFooSubDir, tmp),
CheckAnswer(("keep2", "foo")),

// Append to same partition=1 sub dir
AddTextFileData("{'value': 'keep3'}", partitionFooSubDir, tmp),
CheckAnswer(("keep2", "foo"), ("keep3", "foo")),

// Create new partition sub dir and write to it
AddTextFileData("{'value': 'keep4'}", partitionBarSubDir, tmp),
CheckAnswer(("keep2", "foo"), ("keep3", "foo"), ("keep4", "bar")),

// Append to same partition=2 sub dir
AddTextFileData("{'value': 'keep5'}", partitionBarSubDir, tmp),
CheckAnswer(("keep2", "foo"), ("keep3", "foo"), ("keep4", "bar"), ("keep5", "bar"))
)
}
}

test("fault tolerance") {
withTempDirs { case (src, tmp) =>
val fileStream = createFileStream("text", src.getCanonicalPath)
Expand Down