Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Added maxDepth to treeString which limits the depth of the printed st…
…ring.
  • Loading branch information
DaveDeCaprio committed Nov 18, 2018
commit 3a9743fbc89358055c37cc45437f191fc5f15957
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package org.apache.spark.sql.catalyst.trees

import java.io.Writer
import java.util.UUID
import java.util.concurrent.atomic.AtomicBoolean

import scala.collection.Map
import scala.reflect.ClassTag
Expand All @@ -29,6 +30,8 @@ import org.json4s.JsonAST._
import org.json4s.JsonDSL._
import org.json4s.jackson.JsonMethods._

import org.apache.spark.SparkEnv
import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.FunctionIdentifier
import org.apache.spark.sql.catalyst.ScalaReflection._
import org.apache.spark.sql.catalyst.TableIdentifier
Expand Down Expand Up @@ -480,14 +483,22 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]] extends Product with Log
}
}

def treeString(
def treeString(
writer: Writer,
verbose: Boolean,
addSuffix: Boolean): Unit = {
treeString(writer, verbose, addSuffix, TreeNode.maxTreeToStringDepth)
}

def treeString(
writer: Writer,
verbose: Boolean,
addSuffix: Boolean): Unit = {
generateTreeString(0, Nil, writer, verbose, "", addSuffix)
addSuffix: Boolean,
maxDepth: Int): Unit = {
generateTreeString(0, Nil, writer, verbose, "", addSuffix, maxDepth)
}

/**
/**
* Returns a string representation of the nodes in this tree, where each operator is numbered.
* The numbers can be used with [[TreeNode.apply]] to easily access specific subtrees.
*
Expand Down Expand Up @@ -550,7 +561,8 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]] extends Product with Log
writer: Writer,
verbose: Boolean,
prefix: String = "",
addSuffix: Boolean = false): Unit = {
addSuffix: Boolean = false,
maxDepth: Int = TreeNode.maxTreeToStringDepth): Unit = {

if (depth > 0) {
lastChildren.init.foreach { isLast =>
Expand All @@ -559,30 +571,42 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]] extends Product with Log
writer.write(if (lastChildren.last) "+- " else ":- ")
}

val str = if (verbose) {
if (addSuffix) verboseStringWithSuffix else verboseString
} else {
simpleString
}
writer.write(prefix)
writer.write(str)
writer.write("\n")

if (innerChildren.nonEmpty) {
innerChildren.init.foreach(_.generateTreeString(
depth + 2, lastChildren :+ children.isEmpty :+ false, writer, verbose,
addSuffix = addSuffix))
innerChildren.last.generateTreeString(
depth + 2, lastChildren :+ children.isEmpty :+ true, writer, verbose,
addSuffix = addSuffix)
}

if (children.nonEmpty) {
children.init.foreach(_.generateTreeString(
depth + 1, lastChildren :+ false, writer, verbose, prefix, addSuffix))
children.last.generateTreeString(
depth + 1, lastChildren :+ true, writer, verbose, prefix, addSuffix)
}
if (depth < maxDepth) {
val str = if (verbose) {
if (addSuffix) verboseStringWithSuffix else verboseString
} else {
simpleString
}
writer.write(prefix)
writer.write(str)
writer.write("\n")

if (innerChildren.nonEmpty) {
innerChildren.init.foreach(_.generateTreeString(
depth + 2, lastChildren :+ children.isEmpty :+ false, writer, verbose,
addSuffix = addSuffix, maxDepth = maxDepth))
innerChildren.last.generateTreeString(
depth + 2, lastChildren :+ children.isEmpty :+ true, writer, verbose,
addSuffix = addSuffix, maxDepth = maxDepth)
}

if (children.nonEmpty) {
children.init.foreach(_.generateTreeString(
depth + 1, lastChildren :+ false, writer, verbose, prefix, addSuffix, maxDepth))
children.last.generateTreeString(
depth + 1, lastChildren :+ true, writer, verbose, prefix, addSuffix, maxDepth)
}
}
else {
if (TreeNode.treeDepthWarningPrinted.compareAndSet(false, true)) {
logWarn(
"Truncated the string representation of a plan since it was nested too deeply. " +
"This behavior can be adjusted by setting 'spark.debug.maxToStringTreeDepth' in " +
"SparkEnv.conf.")
}
writer.write(prefix)
writer.write("...\n")
}
}

/**
Expand Down Expand Up @@ -701,3 +725,23 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]] extends Product with Log
case _ => false
}
}

object TreeNode {
/**
* Query plans for large, deeply nested plans can get extremely large. To limit the impact,
* we add a parameter that limits the logging to the top layers if the tree gets too deep.
* This can be overridden by setting the 'spark.debug.maxToStringTreeDepth' conf in SparkEnv.
*/
val DEFAULT_MAX_TO_STRING_TREE_DEPTH = 15

def maxTreeToStringDepth: Int = {
if (SparkEnv.get != null) {
SparkEnv.get.conf.getInt("spark.debug.maxToStringTreeDepth", DEFAULT_MAX_TO_STRING_TREE_DEPTH)
} else {
DEFAULT_MAX_TO_STRING_TREE_DEPTH
}
}

/** Whether we have warned about plan string truncation yet. */
private val treeDepthWarningPrinted = new AtomicBoolean(false)
}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wasn't sure where to put this code, so made a TreeNode companion object. If there is a better place let me know.

Original file line number Diff line number Diff line change
Expand Up @@ -190,13 +190,23 @@ class QueryExecution(val sparkSession: SparkSession, val logical: LogicalPlan) {
}

override def toString: String = withRedaction {
val writer = new StringBuilderWriter()
try {
writePlans(writer)
writer.toString
} finally {
writer.close()
}
def output = Utils.truncatedString(
analyzed.output.map(o => s"${o.name}: ${o.dataType.simpleString}"), ", ")
val analyzedPlan = Seq(
stringOrError(output),
stringOrError(analyzed.treeString(verbose = true))
).filter(_.nonEmpty).mkString("\n")

val builder = new StringBuilder
builder.append("== Parsed Logical Plan ==\n")
builder.append(stringOrError(logical.treeString(verbose = true)))
builder.append("== Analyzed Logical Plan ==\n")
builder.append(analyzedPlan)
builder.append("== Optimized Logical Plan ==\n")
builder.append(stringOrError(optimizedPlan.treeString(verbose = true)))
builder.append("== Physical Plan ==\n")
builder.append(stringOrError(executedPlan.treeString(verbose = true)))
builder.toString
}

def stringWithStats: String = withRedaction {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -455,8 +455,9 @@ case class InputAdapter(child: SparkPlan) extends UnaryExecNode with CodegenSupp
writer: Writer,
verbose: Boolean,
prefix: String = "",
addSuffix: Boolean = false): Unit = {
child.generateTreeString(depth, lastChildren, writer, verbose, prefix = "", addSuffix = false)
addSuffix: Boolean = false,
maxDepth: Int = TreeNode.maxTreeToStringDepth): Unit = {
child.generateTreeString(depth, lastChildren, writer, verbose, prefix, addSuffix, maxDepth)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Prefix and addSuffix are ignored in the original code but I can't tell why. Seems like they should be passed through from parent to child here. I adjusted, but it might not be a bug or maybe should go in a separate PR

Suggested change
child.generateTreeString(depth, lastChildren, writer, verbose, prefix, addSuffix, maxDepth)
child.generateTreeString(depth, lastChildren, writer, verbose, prefix = "", addSuffix = false, maxDepth)

}

override def needCopyResult: Boolean = false
Expand Down Expand Up @@ -731,8 +732,16 @@ case class WholeStageCodegenExec(child: SparkPlan)(val codegenStageId: Int)
writer: Writer,
verbose: Boolean,
prefix: String = "",
addSuffix: Boolean = false): Unit = {
child.generateTreeString(depth, lastChildren, writer, verbose, s"*($codegenStageId) ", false)
addSuffix: Boolean = false,
maxDepth: Int = TreeNode.maxTreeToStringDepth): Unit = {
child.generateTreeString(
depth,
lastChildren,
writer,
verbose,
s"*($codegenStageId) ",
false,
maxDepth)
}

override def needStopCheck: Boolean = true
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, OneRowRelation}
import org.apache.spark.sql.test.SharedSQLContext

case class Simple(a: String, b: Int)

class QueryExecutionSuite extends SharedSQLContext {
def checkDumpedPlans(path: String, expected: Int): Unit = {
assert(Source.fromFile(path).getLines.toList
Expand Down Expand Up @@ -108,4 +110,16 @@ class QueryExecutionSuite extends SharedSQLContext {
val error = intercept[Error](qe.toString)
assert(error.getMessage.contains("error"))
}

test("toString() tree depth") {
import testImplicits._

val s = Seq(Simple("a", 1), Simple("b", 3), Simple("c", 4))
val ds = (1 until 30).foldLeft(s.toDF()) { case (newDs, _) =>
newDs.join(s.toDF(), "a")
}

val nLines = ds.queryExecution.optimizedPlan.toString.split("\n").length
assert(nLines < 30)
}
}