Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -197,10 +197,13 @@ case class DataSource(
SparkHadoopUtil.get.globPathIfNecessary(qualified)
}.toArray
val fileCatalog = new ListingFileCatalog(sparkSession, globbedPaths, options, None)
val partitionCols = fileCatalog.partitionSpec().partitionColumns.fields
format.inferSchema(
sparkSession,
caseInsensitiveOptions,
fileCatalog.allFiles())
fileCatalog.allFiles()).map { inferredSchema =>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The wrapping threw me off for a minute. You might consider introducing an extra variable for the result of inferSchema. Also does .map(partitionCols.foldLeft(_)((struct, field) => struct.add(field))) work?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will change it to use @marmbrus's version below.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

I think this is much easier to understand as:

val partitionCols = fileCatalog.partitionSpec().partitionColumns.fields
val inferred = format.inferSchema(
  sparkSession,
  caseInsensitiveOptions,
  fileCatalog.allFiles())

inferred.map { inferredSchema =>
  StructType(inferredSchema ++ partitionCols)
}

partitionCols.foldLeft(inferredSchema)((struct, field) => struct.add(field))
}
}.getOrElse {
throw new AnalysisException("Unable to infer schema. It must be specified manually.")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,14 @@ class FileStreamSource(
fs.makeQualified(new Path(path)) // can contains glob patterns
}

private val optionsWithPartitionBasePath = if (!SparkHadoopUtil.get.isGlobPath(new Path(path))) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also just a style thing but is this more direct?

private val optionsWithPartitionBasePath = sourceOptions.optionMapWithoutPath ++
  if (!SparkHadoopUtil.get.isGlobPath(new Path(path)) && options.contains("path") {
    Map("basePath" -> path)
  } else {
    Map()
  }

Not sure, just avoided some repetition but is about the same amount of code.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK. Looks better.

options.get("path").map { path =>
sourceOptions.optionMapWithoutPath + (("basePath", path))
}.getOrElse(sourceOptions.optionMapWithoutPath)
} else {
sourceOptions.optionMapWithoutPath
}

private val metadataLog = new HDFSMetadataLog[Array[FileEntry]](sparkSession, metadataPath)

private var maxBatchId = metadataLog.getLatest().map(_._1).getOrElse(-1L)
Expand Down Expand Up @@ -135,7 +143,7 @@ class FileStreamSource(
paths = files.map(_.path),
userSpecifiedSchema = Some(schema),
className = fileFormatClassName,
options = sourceOptions.optionMapWithoutPath)
options = optionsWithPartitionBasePath)
Dataset.ofRows(sparkSession, LogicalRelation(newDataSource.resolveRelation()))
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -608,6 +608,34 @@ class FileStreamSourceSuite extends FileStreamSourceTest {

// =============== other tests ================

test("read new files in partitioned table without globbing, should read partition data") {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should probably also have an explicit test for the case where schema inference is turned on (you implicitly test it some with the code changed below)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a test for it.

withTempDirs { case (dir, tmp) =>
val partitionFooSubDir = new File(dir, "partition=foo")
val partitionBarSubDir = new File(dir, "partition=bar")

val schema = new StructType().add("value", StringType).add("partition", StringType)
val fileStream = createFileStream("json", s"${dir.getCanonicalPath}", Some(schema))
val filtered = fileStream.filter($"value" contains "keep")
testStream(filtered)(
// Create new partition=foo sub dir and write to it
AddTextFileData("{'value': 'drop1'}\n{'value': 'keep2'}", partitionFooSubDir, tmp),
CheckAnswer(("keep2", "foo")),

// Append to same partition=1 sub dir
AddTextFileData("{'value': 'keep3'}", partitionFooSubDir, tmp),
CheckAnswer(("keep2", "foo"), ("keep3", "foo")),

// Create new partition sub dir and write to it
AddTextFileData("{'value': 'keep4'}", partitionBarSubDir, tmp),
CheckAnswer(("keep2", "foo"), ("keep3", "foo"), ("keep4", "bar")),

// Append to same partition=2 sub dir
AddTextFileData("{'value': 'keep5'}", partitionBarSubDir, tmp),
CheckAnswer(("keep2", "foo"), ("keep3", "foo"), ("keep4", "bar"), ("keep5", "bar"))
)
}
}

test("fault tolerance") {
withTempDirs { case (src, tmp) =>
val fileStream = createFileStream("text", src.getCanonicalPath)
Expand Down Expand Up @@ -792,7 +820,7 @@ class FileStreamSourceSuite extends FileStreamSourceTest {
}
assert(src.listFiles().size === numFiles)

val files = spark.readStream.text(root.getCanonicalPath).as[String]
val files = spark.readStream.text(root.getCanonicalPath).as[(String, Int)]

// Note this query will use constant folding to eliminate the file scan.
// This is to avoid actually running a Spark job with 10000 tasks
Expand Down