Skip to content
4 changes: 4 additions & 0 deletions docs/structured-streaming-programming-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -512,6 +512,10 @@ csvDF = spark \

These examples generate streaming DataFrames that are untyped, meaning that the schema of the DataFrame is not checked at compile time, only checked at runtime when the query is submitted. Some operations like `map`, `flatMap`, etc. need the type to be known at compile time. To do those, you can convert these untyped streaming DataFrames to typed streaming Datasets using the same methods as static DataFrame. See the [SQL Programming Guide](sql-programming-guide.html) for more details. Additionally, more details on the supported streaming sources are discussed later in the document.

### Schema inference and partition of streaming DataFrames/Datasets

You can specify the schema for streaming DataFrames/Datasets to create with the API as shown in above example (i.e., `userSchema`). Alternatively, for file-based streaming source, you can config it to infer the schema. By default, the configure of streaming schema inference `spark.sql.streaming.schemaInference` is turned off. If the streaming DataFrame/Dataset is partitioned, the partition columns will only be inferred if the partition directories are present when the stream starts. When schema inference is turned off, for all file-based streaming sources except for `text` format, you have to include partition columns in the user provided schema.
Copy link
Contributor

@marmbrus marmbrus Sep 22, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

By default, Structured Streaming from file based sources requires you to specify the schema, rather than rely on Spark to infer it automatically. This restriction ensures a consistent schema will be used for the streaming query, even in the case of failures. For ad-hoc use cases, you can reenable schema inference by setting spark.sql.streaming.schemaInference to true.

Partition discovery does occur when subdirectories that are named /key=value/ are present and listing will automatically recurse into these directories. If these columns appear in the user provided schema, they will be filled in by Spark based on the path of the file being read. The directories that make up the partitioning scheme must be present when the query starts and must remain static. For example, it is okay to add /data/year=2016/ when /data/year=2015/ was present, but it is invalid to change the partitioning column (i.e. by creating the directory /data/date=2016-04-17/).


## Operations on streaming DataFrames/Datasets
You can apply all kinds of operations on streaming DataFrames/Datasets – ranging from untyped, SQL-like operations (e.g. `select`, `where`, `groupBy`), to typed RDD-like operations (e.g. `map`, `filter`, `flatMap`). See the [SQL programming guide](sql-programming-guide.html) for more details. Let’s take a look at a few example operations that you can use.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -197,10 +197,15 @@ case class DataSource(
SparkHadoopUtil.get.globPathIfNecessary(qualified)
}.toArray
val fileCatalog = new ListingFileCatalog(sparkSession, globbedPaths, options, None)
format.inferSchema(
val partitionCols = fileCatalog.partitionSpec().partitionColumns.fields
val inferred = format.inferSchema(
sparkSession,
caseInsensitiveOptions,
fileCatalog.allFiles())

inferred.map { inferredSchema =>
StructType(inferredSchema ++ partitionCols)
}
}.getOrElse {
throw new AnalysisException("Unable to infer schema. It must be specified manually.")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,13 @@ class FileStreamSource(
fs.makeQualified(new Path(path)) // can contains glob patterns
}

private val optionsWithPartitionBasePath = sourceOptions.optionMapWithoutPath ++ {
if (!SparkHadoopUtil.get.isGlobPath(new Path(path)) && options.contains("path")) {
Map("basePath" -> path)
} else {
Map()
}}

private val metadataLog =
new FileStreamSourceLog(FileStreamSourceLog.VERSION, sparkSession, metadataPath)
private var maxBatchId = metadataLog.getLatest().map(_._1).getOrElse(-1L)
Expand Down Expand Up @@ -132,7 +139,7 @@ class FileStreamSource(
paths = files.map(_.path),
userSpecifiedSchema = Some(schema),
className = fileFormatClassName,
options = sourceOptions.optionMapWithoutPath)
options = optionsWithPartitionBasePath)
Dataset.ofRows(sparkSession, LogicalRelation(newDataSource.resolveRelation(
checkFilesExist = false)))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -608,6 +608,73 @@ class FileStreamSourceSuite extends FileStreamSourceTest {

// =============== other tests ================

test("read new files in partitioned table without globbing, should read partition data") {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should probably also have an explicit test for the case where schema inference is turned on (you implicitly test it some with the code changed below)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a test for it.

withTempDirs { case (dir, tmp) =>
val partitionFooSubDir = new File(dir, "partition=foo")
val partitionBarSubDir = new File(dir, "partition=bar")

val schema = new StructType().add("value", StringType).add("partition", StringType)
val fileStream = createFileStream("json", s"${dir.getCanonicalPath}", Some(schema))
val filtered = fileStream.filter($"value" contains "keep")
testStream(filtered)(
// Create new partition=foo sub dir and write to it
AddTextFileData("{'value': 'drop1'}\n{'value': 'keep2'}", partitionFooSubDir, tmp),
CheckAnswer(("keep2", "foo")),

// Append to same partition=foo sub dir
AddTextFileData("{'value': 'keep3'}", partitionFooSubDir, tmp),
CheckAnswer(("keep2", "foo"), ("keep3", "foo")),

// Create new partition sub dir and write to it
AddTextFileData("{'value': 'keep4'}", partitionBarSubDir, tmp),
CheckAnswer(("keep2", "foo"), ("keep3", "foo"), ("keep4", "bar")),

// Append to same partition=bar sub dir
AddTextFileData("{'value': 'keep5'}", partitionBarSubDir, tmp),
CheckAnswer(("keep2", "foo"), ("keep3", "foo"), ("keep4", "bar"), ("keep5", "bar"))
)
}
}

test("when schema inference is turned on, should read partition data") {
def createFile(content: String, src: File, tmp: File): Unit = {
val tempFile = Utils.tempFileWith(new File(tmp, "text"))
val finalFile = new File(src, tempFile.getName)
src.mkdirs()
require(stringToFile(tempFile, content).renameTo(finalFile))
}

withSQLConf(SQLConf.STREAMING_SCHEMA_INFERENCE.key -> "true") {
withTempDirs { case (dir, tmp) =>
val partitionFooSubDir = new File(dir, "partition=foo")
val partitionBarSubDir = new File(dir, "partition=bar")

// Create file in partition, so we can infer the schema.
createFile("{'value': 'drop0'}", partitionFooSubDir, tmp)

val fileStream = createFileStream("json", s"${dir.getCanonicalPath}")
val filtered = fileStream.filter($"value" contains "keep")
testStream(filtered)(
// Append to same partition=foo sub dir
AddTextFileData("{'value': 'drop1'}\n{'value': 'keep2'}", partitionFooSubDir, tmp),
CheckAnswer(("keep2", "foo")),

// Append to same partition=foo sub dir
AddTextFileData("{'value': 'keep3'}", partitionFooSubDir, tmp),
CheckAnswer(("keep2", "foo"), ("keep3", "foo")),

// Create new partition sub dir and write to it
AddTextFileData("{'value': 'keep4'}", partitionBarSubDir, tmp),
CheckAnswer(("keep2", "foo"), ("keep3", "foo"), ("keep4", "bar")),

// Append to same partition=bar sub dir
AddTextFileData("{'value': 'keep5'}", partitionBarSubDir, tmp),
CheckAnswer(("keep2", "foo"), ("keep3", "foo"), ("keep4", "bar"), ("keep5", "bar"))
)
}
}
}

test("fault tolerance") {
withTempDirs { case (src, tmp) =>
val fileStream = createFileStream("text", src.getCanonicalPath)
Expand Down Expand Up @@ -792,7 +859,7 @@ class FileStreamSourceSuite extends FileStreamSourceTest {
}
assert(src.listFiles().size === numFiles)

val files = spark.readStream.text(root.getCanonicalPath).as[String]
val files = spark.readStream.text(root.getCanonicalPath).as[(String, Int)]

// Note this query will use constant folding to eliminate the file scan.
// This is to avoid actually running a Spark job with 10000 tasks
Expand Down