From 2771d71898f187d479cdb0996c96494c0b53a344 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Thu, 25 Aug 2016 15:13:20 +0800 Subject: [PATCH 1/7] Pass path as basePath for partitionSpec creation if path is not globbing. --- .../streaming/FileStreamSource.scala | 10 +++++-- .../sql/streaming/FileStreamSourceSuite.scala | 28 +++++++++++++++++++ 2 files changed, 36 insertions(+), 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala index 0cfad659dc92..45bd3a41a364 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala @@ -104,14 +104,20 @@ class FileStreamSource( val files = metadataLog.get(Some(startId + 1), Some(endId)).flatMap(_._2) logInfo(s"Processing ${files.length} files from ${startId + 1}:$endId") logTrace(s"Files are:\n\t" + files.mkString("\n\t")) - val newOptions = new CaseInsensitiveMap(options).filterKeys(_ != "path") + val newOptions = if (!SparkHadoopUtil.get.isGlobPath(new Path(path))) { + options.get("path").map { path => + options + (("basePath", path)) + }.getOrElse(options) + } else { + options + } val newDataSource = DataSource( sparkSession, paths = files, userSpecifiedSchema = Some(schema), className = fileFormatClassName, - options = newOptions) + options = newOptions.filterKeys(_ != "path")) Dataset.ofRows(sparkSession, LogicalRelation(newDataSource.resolveRelation())) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala index 47260a23c7ee..2f6890a24a06 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala @@ -567,6 +567,34 @@ class FileStreamSourceSuite extends FileStreamSourceTest { // =============== other tests ================ + test("read new files in partitioned table without globbing, should read partition data") { + withTempDirs { case (dir, tmp) => + val partitionFooSubDir = new File(dir, "partition=foo") + val partitionBarSubDir = new File(dir, "partition=bar") + + val schema = new StructType().add("value", StringType).add("partition", StringType) + val fileStream = createFileStream("json", s"${dir.getCanonicalPath}", Some(schema)) + val filtered = fileStream.filter($"value" contains "keep") + testStream(filtered)( + // Create new partition=foo sub dir and write to it + AddTextFileData("{'value': 'drop1'}\n{'value': 'keep2'}", partitionFooSubDir, tmp), + CheckAnswer(("keep2", "foo")), + + // Append to same partition=1 sub dir + AddTextFileData("{'value': 'keep3'}", partitionFooSubDir, tmp), + CheckAnswer(("keep2", "foo"), ("keep3", "foo")), + + // Create new partition sub dir and write to it + AddTextFileData("{'value': 'keep4'}", partitionBarSubDir, tmp), + CheckAnswer(("keep2", "foo"), ("keep3", "foo"), ("keep4", "bar")), + + // Append to same partition=2 sub dir + AddTextFileData("{'value': 'keep5'}", partitionBarSubDir, tmp), + CheckAnswer(("keep2", "foo"), ("keep3", "foo"), ("keep4", "bar"), ("keep5", "bar")) + ) + } + } + test("fault tolerance") { withTempDirs { case (src, tmp) => val fileStream = createFileStream("text", src.getCanonicalPath) From 6adf2e2d2a00689177036c685fad027eb6a13b9e Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Tue, 30 Aug 2016 18:23:53 +0800 Subject: [PATCH 2/7] Address comment. --- .../execution/streaming/FileStreamSource.scala | 17 +++++++++-------- 1 file changed, 9 insertions(+), 8 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala index fdf51b4bb1e9..79f9a7d65880 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala @@ -49,6 +49,14 @@ class FileStreamSource( fs.makeQualified(new Path(path)) // can contains glob patterns } + private val optionsWithPartitionBasePath = if (!SparkHadoopUtil.get.isGlobPath(new Path(path))) { + options.get("path").map { path => + sourceOptions.optionMapWithoutPath + (("basePath", path)) + }.getOrElse(sourceOptions.optionMapWithoutPath) + } else { + sourceOptions.optionMapWithoutPath + } + private val metadataLog = new HDFSMetadataLog[Seq[FileEntry]](sparkSession, metadataPath) private var maxBatchId = metadataLog.getLatest().map(_._1).getOrElse(-1L) @@ -129,20 +137,13 @@ class FileStreamSource( val files = metadataLog.get(Some(startId + 1), Some(endId)).flatMap(_._2) logInfo(s"Processing ${files.length} files from ${startId + 1}:$endId") logTrace(s"Files are:\n\t" + files.mkString("\n\t")) - val newOptions = if (!SparkHadoopUtil.get.isGlobPath(new Path(path))) { - options.get("path").map { path => - sourceOptions.optionMapWithoutPath + (("basePath", path)) - }.getOrElse(sourceOptions.optionMapWithoutPath) - } else { - sourceOptions.optionMapWithoutPath - } val newDataSource = DataSource( sparkSession, paths = files.map(_.path), userSpecifiedSchema = Some(schema), className = fileFormatClassName, - options = newOptions) + options = optionsWithPartitionBasePath) Dataset.ofRows(sparkSession, LogicalRelation(newDataSource.resolveRelation())) } From 04b61c7707b37f381cd35acd272a3d4d06c27b08 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Mon, 19 Sep 2016 20:30:46 +0800 Subject: [PATCH 3/7] Fix a test. --- .../apache/spark/sql/execution/datasources/DataSource.scala | 5 ++++- .../apache/spark/sql/streaming/FileStreamSourceSuite.scala | 2 +- 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala index 93154bd2ca69..692032edc99c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala @@ -197,10 +197,13 @@ case class DataSource( SparkHadoopUtil.get.globPathIfNecessary(qualified) }.toArray val fileCatalog = new ListingFileCatalog(sparkSession, globbedPaths, options, None) + val partitionCols = fileCatalog.partitionSpec().partitionColumns.fields format.inferSchema( sparkSession, caseInsensitiveOptions, - fileCatalog.allFiles()) + fileCatalog.allFiles()).map { inferredSchema => + partitionCols.foldLeft(inferredSchema)((struct, field) => struct.add(field)) + } }.getOrElse { throw new AnalysisException("Unable to infer schema. It must be specified manually.") } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala index 0c2c969f2602..201a5799938f 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala @@ -820,7 +820,7 @@ class FileStreamSourceSuite extends FileStreamSourceTest { } assert(src.listFiles().size === numFiles) - val files = spark.readStream.text(root.getCanonicalPath).as[String] + val files = spark.readStream.text(root.getCanonicalPath).as[(String, Int)] // Note this query will use constant folding to eliminate the file scan. // This is to avoid actually running a Spark job with 10000 tasks From 23ba9a23ab835987ed326a9320cf8632a0783885 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Tue, 20 Sep 2016 10:25:15 +0800 Subject: [PATCH 4/7] Address reviewer comments. --- .../execution/datasources/DataSource.scala | 10 +++-- .../streaming/FileStreamSource.scala | 13 +++--- .../sql/streaming/FileStreamSourceSuite.scala | 40 +++++++++++++++++++ 3 files changed, 52 insertions(+), 11 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala index 692032edc99c..5dd3ed6644e0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala @@ -198,12 +198,14 @@ case class DataSource( }.toArray val fileCatalog = new ListingFileCatalog(sparkSession, globbedPaths, options, None) val partitionCols = fileCatalog.partitionSpec().partitionColumns.fields - format.inferSchema( + val inferred = format.inferSchema( sparkSession, caseInsensitiveOptions, - fileCatalog.allFiles()).map { inferredSchema => - partitionCols.foldLeft(inferredSchema)((struct, field) => struct.add(field)) - } + fileCatalog.allFiles()) + + inferred.map { inferredSchema => + StructType(inferredSchema ++ partitionCols) + } }.getOrElse { throw new AnalysisException("Unable to infer schema. It must be specified manually.") } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala index e1e7c59f3af8..47bdafa44316 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala @@ -49,13 +49,12 @@ class FileStreamSource( fs.makeQualified(new Path(path)) // can contains glob patterns } - private val optionsWithPartitionBasePath = if (!SparkHadoopUtil.get.isGlobPath(new Path(path))) { - options.get("path").map { path => - sourceOptions.optionMapWithoutPath + (("basePath", path)) - }.getOrElse(sourceOptions.optionMapWithoutPath) - } else { - sourceOptions.optionMapWithoutPath - } + private val optionsWithPartitionBasePath = sourceOptions.optionMapWithoutPath ++ { + if (!SparkHadoopUtil.get.isGlobPath(new Path(path)) && options.contains("path")) { + Map("basePath" -> path) + } else { + Map() + }} private val metadataLog = new HDFSMetadataLog[Array[FileEntry]](sparkSession, metadataPath) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala index 201a5799938f..baebe92107f5 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala @@ -636,6 +636,46 @@ class FileStreamSourceSuite extends FileStreamSourceTest { } } + test("when schema inference is turned on, should read partition data") { + def createFile(content: String, src: File, tmp: File): Unit = { + val tempFile = Utils.tempFileWith(new File(tmp, "text")) + val finalFile = new File(src, tempFile.getName) + src.mkdirs() + require(stringToFile(tempFile, content).renameTo(finalFile)) + } + + withSQLConf(SQLConf.STREAMING_SCHEMA_INFERENCE.key -> "true") { + withTempDirs { case (dir, tmp) => + val partitionFooSubDir = new File(dir, "partition=foo") + val partitionBarSubDir = new File(dir, "partition=bar") + + // Create files in partitions, so we can infer the schema. + createFile("{'value': 'drop0'}", partitionFooSubDir, tmp) + createFile("{'value': 'drop0'}", partitionBarSubDir, tmp) + + val fileStream = createFileStream("json", s"${dir.getCanonicalPath}") + val filtered = fileStream.filter($"value" contains "keep") + testStream(filtered)( + // Create new partition=foo sub dir and write to it + AddTextFileData("{'value': 'drop1'}\n{'value': 'keep2'}", partitionFooSubDir, tmp), + CheckAnswer(("keep2", "foo")), + + // Append to same partition=1 sub dir + AddTextFileData("{'value': 'keep3'}", partitionFooSubDir, tmp), + CheckAnswer(("keep2", "foo"), ("keep3", "foo")), + + // Create new partition sub dir and write to it + AddTextFileData("{'value': 'keep4'}", partitionBarSubDir, tmp), + CheckAnswer(("keep2", "foo"), ("keep3", "foo"), ("keep4", "bar")), + + // Append to same partition=2 sub dir + AddTextFileData("{'value': 'keep5'}", partitionBarSubDir, tmp), + CheckAnswer(("keep2", "foo"), ("keep3", "foo"), ("keep4", "bar"), ("keep5", "bar")) + ) + } + } + } + test("fault tolerance") { withTempDirs { case (src, tmp) => val fileStream = createFileStream("text", src.getCanonicalPath) From 5b101aba62efd34077495eb55159ec1b93d2c90e Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Tue, 20 Sep 2016 11:00:13 +0800 Subject: [PATCH 5/7] Fix wrong comment, slightly tailor the new test. --- .../spark/sql/streaming/FileStreamSourceSuite.scala | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala index baebe92107f5..61e9687162cf 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala @@ -621,7 +621,7 @@ class FileStreamSourceSuite extends FileStreamSourceTest { AddTextFileData("{'value': 'drop1'}\n{'value': 'keep2'}", partitionFooSubDir, tmp), CheckAnswer(("keep2", "foo")), - // Append to same partition=1 sub dir + // Append to same partition=foo sub dir AddTextFileData("{'value': 'keep3'}", partitionFooSubDir, tmp), CheckAnswer(("keep2", "foo"), ("keep3", "foo")), @@ -629,7 +629,7 @@ class FileStreamSourceSuite extends FileStreamSourceTest { AddTextFileData("{'value': 'keep4'}", partitionBarSubDir, tmp), CheckAnswer(("keep2", "foo"), ("keep3", "foo"), ("keep4", "bar")), - // Append to same partition=2 sub dir + // Append to same partition=bar sub dir AddTextFileData("{'value': 'keep5'}", partitionBarSubDir, tmp), CheckAnswer(("keep2", "foo"), ("keep3", "foo"), ("keep4", "bar"), ("keep5", "bar")) ) @@ -649,18 +649,17 @@ class FileStreamSourceSuite extends FileStreamSourceTest { val partitionFooSubDir = new File(dir, "partition=foo") val partitionBarSubDir = new File(dir, "partition=bar") - // Create files in partitions, so we can infer the schema. + // Create file in partition, so we can infer the schema. createFile("{'value': 'drop0'}", partitionFooSubDir, tmp) - createFile("{'value': 'drop0'}", partitionBarSubDir, tmp) val fileStream = createFileStream("json", s"${dir.getCanonicalPath}") val filtered = fileStream.filter($"value" contains "keep") testStream(filtered)( - // Create new partition=foo sub dir and write to it + // Append to same partition=foo sub dir AddTextFileData("{'value': 'drop1'}\n{'value': 'keep2'}", partitionFooSubDir, tmp), CheckAnswer(("keep2", "foo")), - // Append to same partition=1 sub dir + // Append to same partition=foo sub dir AddTextFileData("{'value': 'keep3'}", partitionFooSubDir, tmp), CheckAnswer(("keep2", "foo"), ("keep3", "foo")), @@ -668,7 +667,7 @@ class FileStreamSourceSuite extends FileStreamSourceTest { AddTextFileData("{'value': 'keep4'}", partitionBarSubDir, tmp), CheckAnswer(("keep2", "foo"), ("keep3", "foo"), ("keep4", "bar")), - // Append to same partition=2 sub dir + // Append to same partition=bar sub dir AddTextFileData("{'value': 'keep5'}", partitionBarSubDir, tmp), CheckAnswer(("keep2", "foo"), ("keep3", "foo"), ("keep4", "bar"), ("keep5", "bar")) ) From 541dfdc637b5373c384249a601d0a3e8486adb07 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Tue, 20 Sep 2016 11:24:06 +0800 Subject: [PATCH 6/7] Update programming guide. --- docs/structured-streaming-programming-guide.md | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/docs/structured-streaming-programming-guide.md b/docs/structured-streaming-programming-guide.md index c7ed3b04bced..591e06b7e411 100644 --- a/docs/structured-streaming-programming-guide.md +++ b/docs/structured-streaming-programming-guide.md @@ -512,6 +512,10 @@ csvDF = spark \ These examples generate streaming DataFrames that are untyped, meaning that the schema of the DataFrame is not checked at compile time, only checked at runtime when the query is submitted. Some operations like `map`, `flatMap`, etc. need the type to be known at compile time. To do those, you can convert these untyped streaming DataFrames to typed streaming Datasets using the same methods as static DataFrame. See the [SQL Programming Guide](sql-programming-guide.html) for more details. Additionally, more details on the supported streaming sources are discussed later in the document. +### Schema inference and partition of streaming DataFrames/Datasets + +You can specify the schema for streaming DataFrames/Datasets to create with the API as shown in above example (i.e., `userSchema`). Alternatively, for file-based streaming source, you can config it to infer the schema. By default, the configure of streaming schema inference `spark.sql.streaming.schemaInference` is turned off. If the streaming DataFrame/Dataset is partitioned, the partition columns will only be inferred if the partition directories are present when the stream starts. When schema inference is turned off, for all file-based streaming sources except for `text` format, you have to include partition columns in the user provided schema. + ## Operations on streaming DataFrames/Datasets You can apply all kinds of operations on streaming DataFrames/Datasets – ranging from untyped, SQL-like operations (e.g. `select`, `where`, `groupBy`), to typed RDD-like operations (e.g. `map`, `filter`, `flatMap`). See the [SQL programming guide](sql-programming-guide.html) for more details. Let’s take a look at a few example operations that you can use. From e21536e7c20253cf2c04f80041592d8b095dbff4 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Fri, 23 Sep 2016 03:53:14 +0000 Subject: [PATCH 7/7] Update doc and enhance the added test. --- docs/structured-streaming-programming-guide.md | 4 +++- .../sql/streaming/FileStreamSourceSuite.scala | 16 +++++++++++++++- .../apache/spark/sql/streaming/StreamTest.scala | 8 ++++++++ 3 files changed, 26 insertions(+), 2 deletions(-) diff --git a/docs/structured-streaming-programming-guide.md b/docs/structured-streaming-programming-guide.md index 591e06b7e411..2e6df94823d3 100644 --- a/docs/structured-streaming-programming-guide.md +++ b/docs/structured-streaming-programming-guide.md @@ -514,7 +514,9 @@ These examples generate streaming DataFrames that are untyped, meaning that the ### Schema inference and partition of streaming DataFrames/Datasets -You can specify the schema for streaming DataFrames/Datasets to create with the API as shown in above example (i.e., `userSchema`). Alternatively, for file-based streaming source, you can config it to infer the schema. By default, the configure of streaming schema inference `spark.sql.streaming.schemaInference` is turned off. If the streaming DataFrame/Dataset is partitioned, the partition columns will only be inferred if the partition directories are present when the stream starts. When schema inference is turned off, for all file-based streaming sources except for `text` format, you have to include partition columns in the user provided schema. +By default, Structured Streaming from file based sources requires you to specify the schema, rather than rely on Spark to infer it automatically. This restriction ensures a consistent schema will be used for the streaming query, even in the case of failures. For ad-hoc use cases, you can reenable schema inference by setting `spark.sql.streaming.schemaInference` to `true`. + +Partition discovery does occur when subdirectories that are named `/key=value/` are present and listing will automatically recurse into these directories. If these columns appear in the user provided schema, they will be filled in by Spark based on the path of the file being read. The directories that make up the partitioning scheme must be present when the query starts and must remain static. For example, it is okay to add `/data/year=2016/` when `/data/year=2015/` was present, but it is invalid to change the partitioning column (i.e. by creating the directory `/data/date=2016-04-17/`). ## Operations on streaming DataFrames/Datasets You can apply all kinds of operations on streaming DataFrames/Datasets – ranging from untyped, SQL-like operations (e.g. `select`, `where`, `groupBy`), to typed RDD-like operations (e.g. `map`, `filter`, `flatMap`). See the [SQL programming guide](sql-programming-guide.html) for more details. Let’s take a look at a few example operations that you can use. diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala index 352b47170e9a..3157afe5a56c 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala @@ -102,6 +102,12 @@ class FileStreamSourceTest extends StreamTest with SharedSQLContext with Private } } + case class DeleteFile(file: File) extends ExternalAction { + def runAction(): Unit = { + Utils.deleteRecursively(file) + } + } + /** Use `format` and `path` to create FileStreamSource via DataFrameReader */ def createFileStream( format: String, @@ -669,7 +675,15 @@ class FileStreamSourceSuite extends FileStreamSourceTest { // Append to same partition=bar sub dir AddTextFileData("{'value': 'keep5'}", partitionBarSubDir, tmp), - CheckAnswer(("keep2", "foo"), ("keep3", "foo"), ("keep4", "bar"), ("keep5", "bar")) + CheckAnswer(("keep2", "foo"), ("keep3", "foo"), ("keep4", "bar"), ("keep5", "bar")), + + // Delete the two partition dirs + DeleteFile(partitionFooSubDir), + DeleteFile(partitionBarSubDir), + + AddTextFileData("{'value': 'keep6'}", partitionBarSubDir, tmp), + CheckAnswer(("keep2", "foo"), ("keep3", "foo"), ("keep4", "bar"), ("keep5", "bar"), + ("keep6", "bar")) ) } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala index 6c5b170d9c7c..aa6515bc7a90 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala @@ -95,6 +95,11 @@ trait StreamTest extends QueryTest with SharedSQLContext with Timeouts { def addData(query: Option[StreamExecution]): (Source, Offset) } + /** A trait that can be extended when testing a source. */ + trait ExternalAction extends StreamAction { + def runAction(): Unit + } + case class AddDataMemory[A](source: MemoryStream[A], data: Seq[A]) extends AddData { override def toString: String = s"AddData to $source: ${data.mkString(",")}" @@ -429,6 +434,9 @@ trait StreamTest extends QueryTest with SharedSQLContext with Timeouts { failTest("Error adding data", e) } + case e: ExternalAction => + e.runAction() + case CheckAnswerRows(expectedAnswer, lastOnly, isSorted) => verify(currentStream != null, "stream not running") // Get the map of source index to the current source objects