Skip to content

Commit 090e8c2

Browse files
committed
Dumping truncated plans to a file
1 parent f9ff756 commit 090e8c2

File tree

5 files changed

+166
-44
lines changed

5 files changed

+166
-44
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala

Lines changed: 29 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -17,11 +17,13 @@
1717

1818
package org.apache.spark.sql.catalyst.trees
1919

20+
import java.io.Writer
2021
import java.util.UUID
2122

2223
import scala.collection.Map
2324
import scala.reflect.ClassTag
2425

26+
import org.apache.commons.io.output.StringBuilderWriter
2527
import org.apache.commons.lang3.ClassUtils
2628
import org.json4s.JsonAST._
2729
import org.json4s.JsonDSL._
@@ -469,7 +471,21 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]] extends Product {
469471
def treeString: String = treeString(verbose = true)
470472

471473
def treeString(verbose: Boolean, addSuffix: Boolean = false): String = {
472-
generateTreeString(0, Nil, new StringBuilder, verbose = verbose, addSuffix = addSuffix).toString
474+
val writer = new StringBuilderWriter()
475+
try {
476+
treeString(writer, verbose, addSuffix, None)
477+
writer.toString
478+
} finally {
479+
writer.close()
480+
}
481+
}
482+
483+
def treeString(
484+
writer: Writer,
485+
verbose: Boolean,
486+
addSuffix: Boolean,
487+
maxFields: Option[Int]): Unit = {
488+
generateTreeString(0, Nil, writer, verbose, "", addSuffix)
473489
}
474490

475491
/**
@@ -521,7 +537,7 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]] extends Product {
521537
protected def innerChildren: Seq[TreeNode[_]] = Seq.empty
522538

523539
/**
524-
* Appends the string representation of this node and its children to the given StringBuilder.
540+
* Appends the string representation of this node and its children to the given Writer.
525541
*
526542
* The `i`-th element in `lastChildren` indicates whether the ancestor of the current node at
527543
* depth `i + 1` is the last child of its own parent node. The depth of the root node is 0, and
@@ -532,44 +548,42 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]] extends Product {
532548
def generateTreeString(
533549
depth: Int,
534550
lastChildren: Seq[Boolean],
535-
builder: StringBuilder,
551+
writer: Writer,
536552
verbose: Boolean,
537553
prefix: String = "",
538-
addSuffix: Boolean = false): StringBuilder = {
554+
addSuffix: Boolean = false): Unit = {
539555

540556
if (depth > 0) {
541557
lastChildren.init.foreach { isLast =>
542-
builder.append(if (isLast) " " else ": ")
558+
writer.write(if (isLast) " " else ": ")
543559
}
544-
builder.append(if (lastChildren.last) "+- " else ":- ")
560+
writer.write(if (lastChildren.last) "+- " else ":- ")
545561
}
546562

547563
val str = if (verbose) {
548564
if (addSuffix) verboseStringWithSuffix else verboseString
549565
} else {
550566
simpleString
551567
}
552-
builder.append(prefix)
553-
builder.append(str)
554-
builder.append("\n")
568+
writer.write(prefix)
569+
writer.write(str)
570+
writer.write("\n")
555571

556572
if (innerChildren.nonEmpty) {
557573
innerChildren.init.foreach(_.generateTreeString(
558-
depth + 2, lastChildren :+ children.isEmpty :+ false, builder, verbose,
574+
depth + 2, lastChildren :+ children.isEmpty :+ false, writer, verbose,
559575
addSuffix = addSuffix))
560576
innerChildren.last.generateTreeString(
561-
depth + 2, lastChildren :+ children.isEmpty :+ true, builder, verbose,
577+
depth + 2, lastChildren :+ children.isEmpty :+ true, writer, verbose,
562578
addSuffix = addSuffix)
563579
}
564580

565581
if (children.nonEmpty) {
566582
children.init.foreach(_.generateTreeString(
567-
depth + 1, lastChildren :+ false, builder, verbose, prefix, addSuffix))
583+
depth + 1, lastChildren :+ false, writer, verbose, prefix, addSuffix))
568584
children.last.generateTreeString(
569-
depth + 1, lastChildren :+ true, builder, verbose, prefix, addSuffix)
585+
depth + 1, lastChildren :+ true, writer, verbose, prefix, addSuffix)
570586
}
571-
572-
builder
573587
}
574588

575589
/**

sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala

Lines changed: 52 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,13 @@
1717

1818
package org.apache.spark.sql.execution
1919

20+
import java.io.{BufferedWriter, OutputStreamWriter, Writer}
2021
import java.nio.charset.StandardCharsets
2122
import java.sql.{Date, Timestamp}
2223

24+
import org.apache.commons.io.output.StringBuilderWriter
25+
import org.apache.hadoop.fs.Path
26+
2327
import org.apache.spark.rdd.RDD
2428
import org.apache.spark.sql.{AnalysisException, Row, SparkSession}
2529
import org.apache.spark.sql.catalyst.InternalRow
@@ -189,23 +193,38 @@ class QueryExecution(val sparkSession: SparkSession, val logical: LogicalPlan) {
189193
""".stripMargin.trim
190194
}
191195

196+
private def writeOrError(writer: Writer)(f: Writer => Unit): Unit = {
197+
try f(writer)
198+
catch {
199+
case e: AnalysisException => writer.write(e.toString)
200+
}
201+
}
202+
203+
private def writePlans(writer: Writer, maxFields: Option[Int]): Unit = {
204+
val (verbose, addSuffix) = (true, false)
205+
206+
writer.write("== Parsed Logical Plan ==\n")
207+
writeOrError(writer)(logical.treeString(_, verbose, addSuffix, maxFields))
208+
writer.write("\n== Analyzed Logical Plan ==\n")
209+
val analyzedOutput = stringOrError(Utils.truncatedString(
210+
analyzed.output.map(o => s"${o.name}: ${o.dataType.simpleString}"), ", "))
211+
writer.write(analyzedOutput)
212+
writer.write("\n")
213+
writeOrError(writer)(analyzed.treeString(_, verbose, addSuffix, maxFields))
214+
writer.write("\n== Optimized Logical Plan ==\n")
215+
writeOrError(writer)(optimizedPlan.treeString(_, verbose, addSuffix, maxFields))
216+
writer.write("\n== Physical Plan ==\n")
217+
writeOrError(writer)(executedPlan.treeString(_, verbose, addSuffix, maxFields))
218+
}
219+
192220
override def toString: String = withRedaction {
193-
def output = Utils.truncatedString(
194-
analyzed.output.map(o => s"${o.name}: ${o.dataType.simpleString}"), ", ")
195-
val analyzedPlan = Seq(
196-
stringOrError(output),
197-
stringOrError(analyzed.treeString(verbose = true))
198-
).filter(_.nonEmpty).mkString("\n")
199-
200-
s"""== Parsed Logical Plan ==
201-
|${stringOrError(logical.treeString(verbose = true))}
202-
|== Analyzed Logical Plan ==
203-
|$analyzedPlan
204-
|== Optimized Logical Plan ==
205-
|${stringOrError(optimizedPlan.treeString(verbose = true))}
206-
|== Physical Plan ==
207-
|${stringOrError(executedPlan.treeString(verbose = true))}
208-
""".stripMargin.trim
221+
val writer = new StringBuilderWriter()
222+
try {
223+
writePlans(writer, None)
224+
writer.toString
225+
} finally {
226+
writer.close()
227+
}
209228
}
210229

211230
def stringWithStats: String = withRedaction {
@@ -250,5 +269,22 @@ class QueryExecution(val sparkSession: SparkSession, val logical: LogicalPlan) {
250269
def codegenToSeq(): Seq[(String, String)] = {
251270
org.apache.spark.sql.execution.debug.codegenStringSeq(executedPlan)
252271
}
272+
273+
/**
274+
* Dumps debug information about query execution into the specified file.
275+
*/
276+
def toFile(path: String): Unit = {
277+
val filePath = new Path(path)
278+
val fs = filePath.getFileSystem(sparkSession.sessionState.newHadoopConf())
279+
val writer = new BufferedWriter(new OutputStreamWriter(fs.create(filePath)))
280+
281+
try {
282+
writePlans(writer, Some(Int.MaxValue))
283+
writer.write("\n== Whole Stage Codegen ==\n")
284+
org.apache.spark.sql.execution.debug.writeCodegen(writer, executedPlan)
285+
} finally {
286+
writer.close()
287+
}
288+
}
253289
}
254290
}

sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
package org.apache.spark.sql.execution
1919

20+
import java.io.Writer
2021
import java.util.Locale
2122
import java.util.function.Supplier
2223

@@ -450,11 +451,11 @@ case class InputAdapter(child: SparkPlan) extends UnaryExecNode with CodegenSupp
450451
override def generateTreeString(
451452
depth: Int,
452453
lastChildren: Seq[Boolean],
453-
builder: StringBuilder,
454+
writer: Writer,
454455
verbose: Boolean,
455456
prefix: String = "",
456-
addSuffix: Boolean = false): StringBuilder = {
457-
child.generateTreeString(depth, lastChildren, builder, verbose, "")
457+
addSuffix: Boolean = false): Unit = {
458+
child.generateTreeString(depth, lastChildren, writer, verbose, prefix = "", addSuffix = false)
458459
}
459460

460461
override def needCopyResult: Boolean = false
@@ -726,11 +727,11 @@ case class WholeStageCodegenExec(child: SparkPlan)(val codegenStageId: Int)
726727
override def generateTreeString(
727728
depth: Int,
728729
lastChildren: Seq[Boolean],
729-
builder: StringBuilder,
730+
writer: Writer,
730731
verbose: Boolean,
731732
prefix: String = "",
732-
addSuffix: Boolean = false): StringBuilder = {
733-
child.generateTreeString(depth, lastChildren, builder, verbose, s"*($codegenStageId) ")
733+
addSuffix: Boolean = false): Unit = {
734+
child.generateTreeString(depth, lastChildren, writer, verbose, s"*($codegenStageId) ", false)
734735
}
735736

736737
override def needStopCheck: Boolean = true

sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala

Lines changed: 19 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,13 @@
1717

1818
package org.apache.spark.sql.execution
1919

20+
import java.io.Writer
2021
import java.util.Collections
2122

2223
import scala.collection.JavaConverters._
2324

25+
import org.apache.commons.io.output.StringBuilderWriter
26+
2427
import org.apache.spark.internal.Logging
2528
import org.apache.spark.rdd.RDD
2629
import org.apache.spark.sql._
@@ -30,7 +33,6 @@ import org.apache.spark.sql.catalyst.expressions.codegen.{CodeFormatter, Codegen
3033
import org.apache.spark.sql.catalyst.plans.physical.Partitioning
3134
import org.apache.spark.sql.catalyst.trees.TreeNodeRef
3235
import org.apache.spark.sql.execution.streaming.{StreamExecution, StreamingQueryWrapper}
33-
import org.apache.spark.sql.execution.streaming.continuous.WriteToContinuousDataSourceExec
3436
import org.apache.spark.sql.streaming.StreamingQuery
3537
import org.apache.spark.util.{AccumulatorV2, LongAccumulator}
3638

@@ -70,15 +72,25 @@ package object debug {
7072
* @return single String containing all WholeStageCodegen subtrees and corresponding codegen
7173
*/
7274
def codegenString(plan: SparkPlan): String = {
75+
val writer = new StringBuilderWriter()
76+
77+
try {
78+
writeCodegen(writer, plan)
79+
writer.toString
80+
} finally {
81+
writer.close()
82+
}
83+
}
84+
85+
def writeCodegen(writer: Writer, plan: SparkPlan): Unit = {
7386
val codegenSeq = codegenStringSeq(plan)
74-
var output = s"Found ${codegenSeq.size} WholeStageCodegen subtrees.\n"
87+
writer.write(s"Found ${codegenSeq.size} WholeStageCodegen subtrees.\n")
7588
for (((subtree, code), i) <- codegenSeq.zipWithIndex) {
76-
output += s"== Subtree ${i + 1} / ${codegenSeq.size} ==\n"
77-
output += subtree
78-
output += "\nGenerated code:\n"
79-
output += s"${code}\n"
89+
writer.write(s"== Subtree ${i + 1} / ${codegenSeq.size} ==\n")
90+
writer.write(subtree)
91+
writer.write("\nGenerated code:\n")
92+
writer.write(s"${code}\n")
8093
}
81-
output
8294
}
8395

8496
/**

sql/core/src/test/scala/org/apache/spark/sql/execution/QueryExecutionSuite.scala

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,11 +16,70 @@
1616
*/
1717
package org.apache.spark.sql.execution
1818

19+
import scala.io.Source
20+
1921
import org.apache.spark.sql.AnalysisException
2022
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, OneRowRelation}
2123
import org.apache.spark.sql.test.SharedSQLContext
2224

2325
class QueryExecutionSuite extends SharedSQLContext {
26+
def checkDumpedPlans(path: String, expected: Int): Unit = {
27+
assert(Source.fromFile(path).getLines.toList
28+
.takeWhile(_ != "== Whole Stage Codegen ==") == List(
29+
"== Parsed Logical Plan ==",
30+
s"Range (0, $expected, step=1, splits=Some(2))",
31+
"",
32+
"== Analyzed Logical Plan ==",
33+
"id: bigint",
34+
s"Range (0, $expected, step=1, splits=Some(2))",
35+
"",
36+
"== Optimized Logical Plan ==",
37+
s"Range (0, $expected, step=1, splits=Some(2))",
38+
"",
39+
"== Physical Plan ==",
40+
s"*(1) Range (0, $expected, step=1, splits=2)",
41+
""))
42+
}
43+
test("dumping query execution info to a file") {
44+
withTempDir { dir =>
45+
val path = dir.getCanonicalPath + "/plans.txt"
46+
val df = spark.range(0, 10)
47+
df.queryExecution.debug.toFile(path)
48+
49+
checkDumpedPlans(path, expected = 10)
50+
}
51+
}
52+
53+
test("dumping query execution info to an existing file") {
54+
withTempDir { dir =>
55+
val path = dir.getCanonicalPath + "/plans.txt"
56+
val df = spark.range(0, 10)
57+
df.queryExecution.debug.toFile(path)
58+
59+
val df2 = spark.range(0, 1)
60+
df2.queryExecution.debug.toFile(path)
61+
checkDumpedPlans(path, expected = 1)
62+
}
63+
}
64+
65+
test("dumping query execution info to non-existing folder") {
66+
withTempDir { dir =>
67+
val path = dir.getCanonicalPath + "/newfolder/plans.txt"
68+
val df = spark.range(0, 100)
69+
df.queryExecution.debug.toFile(path)
70+
checkDumpedPlans(path, expected = 100)
71+
}
72+
}
73+
74+
test("dumping query execution info by invalid path") {
75+
val path = "1234567890://plans.txt"
76+
val exception = intercept[IllegalArgumentException] {
77+
spark.range(0, 100).queryExecution.debug.toFile(path)
78+
}
79+
80+
assert(exception.getMessage.contains("Illegal character in scheme name"))
81+
}
82+
2483
test("toString() exception/error handling") {
2584
spark.experimental.extraStrategies = Seq(
2685
new SparkStrategy {

0 commit comments

Comments
 (0)