Skip to content

Commit 060b329

Browse files
committed
add ut
1 parent 8a33aba commit 060b329

File tree

1 file changed

+22
-2
lines changed

1 file changed

+22
-2
lines changed

sql/core/src/test/scala/org/apache/spark/sql/execution/QueryPlanningTrackerEndToEndSuite.scala

Lines changed: 22 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,11 @@
1717

1818
package org.apache.spark.sql.execution
1919

20-
import org.apache.spark.sql.test.SharedSparkSession
20+
import org.apache.spark.sql.execution.streaming.{MemoryStream, StreamExecution}
21+
import org.apache.spark.sql.streaming.StreamTest
2122

22-
class QueryPlanningTrackerEndToEndSuite extends SharedSparkSession {
23+
class QueryPlanningTrackerEndToEndSuite extends StreamTest {
24+
import testImplicits._
2325

2426
test("programmatic API") {
2527
val df = spark.range(1000).selectExpr("count(*)")
@@ -38,4 +40,22 @@ class QueryPlanningTrackerEndToEndSuite extends SharedSparkSession {
3840
assert(tracker.rules.nonEmpty)
3941
}
4042

43+
test("streaming") {
44+
val inputData = MemoryStream[Int]
45+
val df = inputData.toDF()
46+
47+
def assertStatus(stream: StreamExecution): Unit = {
48+
stream.processAllAvailable()
49+
val tracker = stream.lastExecution.tracker
50+
assert(tracker.phases.keys == Set("analysis", "optimization", "planning"))
51+
assert(tracker.rules.nonEmpty)
52+
}
53+
54+
testStream(df)(
55+
StartStream(),
56+
AddData(inputData, 1, 2, 3),
57+
Execute(assertStatus),
58+
StopStream)
59+
}
60+
4161
}

0 commit comments

Comments
 (0)