Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
Fix StreamingQuery explain command
  • Loading branch information
zsxwing committed Feb 15, 2017
commit 3b6c86a5581df4bdb9a94eac095c9c1ee1363f47
Original file line number Diff line number Diff line change
Expand Up @@ -89,19 +89,21 @@ case class ExecutedCommandExec(cmd: RunnableCommand) extends SparkPlan {
* @param output output schema
* @param extended whether to do extended explain or not
* @param codegen whether to output generated code from whole-stage codegen or not
* @param streaming whether it's a streaming plan
*/
case class ExplainCommand(
logicalPlan: LogicalPlan,
override val output: Seq[Attribute] =
Seq(AttributeReference("plan", StringType, nullable = true)()),
extended: Boolean = false,
codegen: Boolean = false)
codegen: Boolean = false,
streaming: Boolean = false)
extends RunnableCommand {

// Run through the optimizer to generate the physical plan.
override def run(sparkSession: SparkSession): Seq[Row] = try {
val queryExecution =
if (logicalPlan.isStreaming) {
if (streaming) {
// This is used only by explaining `Dataset/DataFrame` created by `spark.readStream`, so the
// output mode does not matter since there is no `Sink`.
new IncrementalExecution(sparkSession, logicalPlan, OutputMode.Append(), "<unknown>", 0, 0)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -673,7 +673,7 @@ class StreamExecution(
if (lastExecution == null) {
"No physical plan. Waiting for data."
} else {
val explain = ExplainCommand(lastExecution.logical, extended = extended)
val explain = ExplainCommand(lastExecution.logical, extended = extended, streaming = true)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So this means that this code will always return an updated plan for the last batch showing which data files were read instead of just referring to it as a StreamingRelation. We wouldn't have the bug if we had just used logicalPlan instead of lastExecution.logicalPlan, right? Then the problem would be that the logicalPlan may contain errors though?

sparkSession.sessionState.executePlan(explain).executedPlan.executeCollect()
.map(_.getString(0)).mkString("\n")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import scala.util.control.ControlThrowable
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.streaming.InternalOutputModes
import org.apache.spark.sql.execution.streaming._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.sources.StreamSourceProvider
import org.apache.spark.sql.types.{IntegerType, StructField, StructType}

Expand Down Expand Up @@ -277,10 +278,11 @@ class StreamSuite extends StreamTest {

test("explain") {
val inputData = MemoryStream[String]
val df = inputData.toDS().map(_ + "foo")
val df = inputData.toDS().map(_ + "foo").groupBy("value").agg(count("*"))
// Test `explain` not throwing errors
df.explain()
val q = df.writeStream.queryName("memory_explain").format("memory").start()
val q = df.writeStream.queryName("memory_explain").outputMode("complete").format("memory")
.start()
.asInstanceOf[StreamingQueryWrapper]
.streamingQuery
try {
Expand All @@ -294,12 +296,16 @@ class StreamSuite extends StreamTest {
// `extended = false` only displays the physical plan.
assert("LocalRelation".r.findAllMatchIn(explainWithoutExtended).size === 0)
assert("LocalTableScan".r.findAllMatchIn(explainWithoutExtended).size === 1)
// Use "StateStoreRestore" to verify that it does output a streaming physical plan
assert(explainWithoutExtended.contains("StateStoreRestore"))

val explainWithExtended = q.explainInternal(true)
// `extended = true` displays 3 logical plans (Parsed/Optimized/Optimized) and 1 physical
// plan.
assert("LocalRelation".r.findAllMatchIn(explainWithExtended).size === 3)
assert("LocalTableScan".r.findAllMatchIn(explainWithExtended).size === 1)
// Use "StateStoreRestore" to verify that it does output a streaming physical plan
assert(explainWithExtended.contains("StateStoreRestore"))
} finally {
q.stop()
}
Expand Down