Skip to content
Prev Previous commit
Update doc and enhance the added test.
  • Loading branch information
viirya committed Sep 23, 2016
commit e21536e7c20253cf2c04f80041592d8b095dbff4
4 changes: 3 additions & 1 deletion docs/structured-streaming-programming-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -514,7 +514,9 @@ These examples generate streaming DataFrames that are untyped, meaning that the

### Schema inference and partition of streaming DataFrames/Datasets

You can specify the schema for streaming DataFrames/Datasets to create with the API as shown in above example (i.e., `userSchema`). Alternatively, for file-based streaming source, you can config it to infer the schema. By default, the configure of streaming schema inference `spark.sql.streaming.schemaInference` is turned off. If the streaming DataFrame/Dataset is partitioned, the partition columns will only be inferred if the partition directories are present when the stream starts. When schema inference is turned off, for all file-based streaming sources except for `text` format, you have to include partition columns in the user provided schema.
By default, Structured Streaming from file based sources requires you to specify the schema, rather than rely on Spark to infer it automatically. This restriction ensures a consistent schema will be used for the streaming query, even in the case of failures. For ad-hoc use cases, you can reenable schema inference by setting `spark.sql.streaming.schemaInference` to `true`.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Schema inference can lead to many corner cases regarding if the inferred schema is different after restart. So I think we should use a stronger language that schema inference is not advisable in production uses.


Partition discovery does occur when subdirectories that are named `/key=value/` are present and listing will automatically recurse into these directories. If these columns appear in the user provided schema, they will be filled in by Spark based on the path of the file being read. The directories that make up the partitioning scheme must be present when the query starts and must remain static. For example, it is okay to add `/data/year=2016/` when `/data/year=2015/` was present, but it is invalid to change the partitioning column (i.e. by creating the directory `/data/date=2016-04-17/`).

## Operations on streaming DataFrames/Datasets
You can apply all kinds of operations on streaming DataFrames/Datasets – ranging from untyped, SQL-like operations (e.g. `select`, `where`, `groupBy`), to typed RDD-like operations (e.g. `map`, `filter`, `flatMap`). See the [SQL programming guide](sql-programming-guide.html) for more details. Let’s take a look at a few example operations that you can use.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,12 @@ class FileStreamSourceTest extends StreamTest with SharedSQLContext with Private
}
}

case class DeleteFile(file: File) extends ExternalAction {
def runAction(): Unit = {
Utils.deleteRecursively(file)
}
}

/** Use `format` and `path` to create FileStreamSource via DataFrameReader */
def createFileStream(
format: String,
Expand Down Expand Up @@ -669,7 +675,15 @@ class FileStreamSourceSuite extends FileStreamSourceTest {

// Append to same partition=bar sub dir
AddTextFileData("{'value': 'keep5'}", partitionBarSubDir, tmp),
CheckAnswer(("keep2", "foo"), ("keep3", "foo"), ("keep4", "bar"), ("keep5", "bar"))
CheckAnswer(("keep2", "foo"), ("keep3", "foo"), ("keep4", "bar"), ("keep5", "bar")),

// Delete the two partition dirs
DeleteFile(partitionFooSubDir),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@viirya why need to delete dirs in this test? It's flaky since the source maybe is listing files.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed them in #15699

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@zsxwing I remember it is used to simulate the partition is deleted and re-inserted data. Thanks for fixing this!

DeleteFile(partitionBarSubDir),

AddTextFileData("{'value': 'keep6'}", partitionBarSubDir, tmp),
CheckAnswer(("keep2", "foo"), ("keep3", "foo"), ("keep4", "bar"), ("keep5", "bar"),
("keep6", "bar"))
)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,11 @@ trait StreamTest extends QueryTest with SharedSQLContext with Timeouts {
def addData(query: Option[StreamExecution]): (Source, Offset)
}

/** A trait that can be extended when testing a source. */
trait ExternalAction extends StreamAction {
def runAction(): Unit
}

case class AddDataMemory[A](source: MemoryStream[A], data: Seq[A]) extends AddData {
override def toString: String = s"AddData to $source: ${data.mkString(",")}"

Expand Down Expand Up @@ -429,6 +434,9 @@ trait StreamTest extends QueryTest with SharedSQLContext with Timeouts {
failTest("Error adding data", e)
}

case e: ExternalAction =>
e.runAction()

case CheckAnswerRows(expectedAnswer, lastOnly, isSorted) =>
verify(currentStream != null, "stream not running")
// Get the map of source index to the current source objects
Expand Down