Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
39 commits
Select commit Hold shift + click to select a range
22bd4bd
Merge pull request #1 from apache/master
DaveDeCaprio Nov 27, 2018
b7f964d
Added a configurable limit on the maximum length of a plan debug string.
DaveDeCaprio Nov 28, 2018
3f4fe1f
Removed unneeded imports
DaveDeCaprio Nov 28, 2018
30e4348
Moved withSizeLimitedWriter to treeString function that uses StringWr…
DaveDeCaprio Nov 28, 2018
5528ca1
Fixed formatting
DaveDeCaprio Nov 28, 2018
3171cf3
Coverted to javadoc style multiline comment
DaveDeCaprio Nov 28, 2018
3ffdc6a
Fixed scalastyle formatting of imports
DaveDeCaprio Nov 28, 2018
45a60fc
Have size limit cut off right at the correct number of characters.
DaveDeCaprio Nov 29, 2018
9678799
Changed name to remove the debug from the config parameter name
DaveDeCaprio Nov 29, 2018
a5af842
Fixed formatting error
DaveDeCaprio Nov 29, 2018
a4be985
Changed length default to Long.MaxValue to turn off behavior unless i…
DaveDeCaprio Dec 3, 2018
f0f75c2
Merge branch 'master' of https://github.com/apache/spark into text-pl…
DaveDeCaprio Dec 3, 2018
1b692a0
Added test case for not limiting plan length and tested with a defaul…
DaveDeCaprio Dec 4, 2018
22fe117
Correctly added test case missed in the previous commit
DaveDeCaprio Dec 4, 2018
be3f265
Added more documentation of the plan length parameter.
DaveDeCaprio Dec 8, 2018
f6d0efc
Merge branch 'master' of https://github.com/apache/spark into text-pl…
DaveDeCaprio Dec 8, 2018
855f540
Removed tab
DaveDeCaprio Dec 8, 2018
e83f5f2
Merge branch 'master' of https://github.com/apache/spark into text-pl…
DaveDeCaprio Jan 5, 2019
db663b7
Merge branch 'master' of https://github.com/apache/spark into text-pl…
DaveDeCaprio Jan 20, 2019
2eecbfa
Added plan size limits to StringConcat
DaveDeCaprio Jan 21, 2019
4082aa3
Scalastyle
DaveDeCaprio Jan 21, 2019
f9085e7
Incorporated changes from code review
DaveDeCaprio Jan 21, 2019
b3d43b7
Missed one error
DaveDeCaprio Jan 21, 2019
e470ab2
Cleaned up append function and added tracking of the full plan length.
DaveDeCaprio Jan 21, 2019
35bc1d5
Got rid of unneeded "availableLength" flag.
DaveDeCaprio Jan 21, 2019
5ec58c8
Fixed errors missed
DaveDeCaprio Jan 21, 2019
bdfaf28
Updated to handle nulls again.
DaveDeCaprio Jan 22, 2019
0cfcb4e
Tabs to spaces
DaveDeCaprio Jan 22, 2019
eb69888
Remove useless test.
DaveDeCaprio Jan 22, 2019
daf02f2
Addressed code review comments.
DaveDeCaprio Mar 8, 2019
7d89388
Missed some Boolean->Unit updates
DaveDeCaprio Mar 8, 2019
4f56e48
Missed some Boolean->Unit updates
DaveDeCaprio Mar 8, 2019
a090fbb
Code formatting
DaveDeCaprio Mar 8, 2019
dcb4eb0
Fixed Fatal Warning.
DaveDeCaprio Mar 8, 2019
b4cb7bf
Fixed failing unit tests
DaveDeCaprio Mar 9, 2019
4fec590
Style/formatting issues from code review
DaveDeCaprio Mar 11, 2019
db0db18
Style/formatting issues from code review
DaveDeCaprio Mar 11, 2019
b5b30f3
Changed plan string length to bytesConf and gave a better message for…
DaveDeCaprio Mar 13, 2019
e4afa26
Tabs to spaces
DaveDeCaprio Mar 13, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Added plan size limits to StringConcat
  • Loading branch information
DaveDeCaprio committed Jan 21, 2019
commit 2eecbfac0a60dc5a49ef359ef748eaec940e244b
Original file line number Diff line number Diff line change
Expand Up @@ -308,10 +308,10 @@ object QueryPlan extends PredicateHelper {
*/
def append[T <: QueryPlan[T]](
plan: => QueryPlan[T],
append: String => Unit,
append: String => Boolean,
verbose: Boolean,
addSuffix: Boolean,
maxFields: Int = SQLConf.get.maxToStringFields): Unit = {
maxFields: Int = SQLConf.get.maxToStringFields): Boolean = {
try {
plan.treeString(append, verbose, addSuffix, maxFields)
} catch {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -487,10 +487,10 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]] extends Product {
}

def treeString(
append: String => Unit,
append: String => Boolean,
verbose: Boolean,
addSuffix: Boolean,
maxFields: Int): Unit = {
maxFields: Int): Boolean = {
generateTreeString(0, Nil, append, verbose, "", addSuffix, maxFields)
}

Expand Down Expand Up @@ -554,11 +554,11 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]] extends Product {
def generateTreeString(
depth: Int,
lastChildren: Seq[Boolean],
append: String => Unit,
append: String => Boolean,
verbose: Boolean,
prefix: String = "",
addSuffix: Boolean = false,
maxFields: Int): Unit = {
maxFields: Int): Boolean = {

if (depth > 0) {
lastChildren.init.foreach { isLast =>
Expand Down Expand Up @@ -591,6 +591,7 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]] extends Product {
children.last.generateTreeString(
depth + 1, lastChildren :+ true, append, verbose, prefix, addSuffix, maxFields)
}
append("")
}

/**
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,17 @@

package org.apache.spark.sql.catalyst.util

import java.util.concurrent.atomic.AtomicBoolean
import java.util.regex.{Pattern, PatternSyntaxException}

import scala.collection.mutable.ArrayBuffer

import org.apache.spark.internal.Logging
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.unsafe.types.UTF8String

object StringUtils {
object StringUtils extends Logging {

/**
* Validate and convert SQL 'like' pattern to a Java regular expression.
Expand Down Expand Up @@ -92,31 +95,57 @@ object StringUtils {

/**
* Concatenation of sequence of strings to final string with cheap append method
* and one memory allocation for the final string.
* and one memory allocation for the final string. Can also bound the final size of
* the string.
*/
class StringConcat {
class StringConcat(val maxLength: Int = Integer.MAX_VALUE) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You generally cannot allocate an array of Integer.MAX_VALUE elements. Can you use ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH instead.

private val strings = new ArrayBuffer[String]
private var length: Int = 0

def atLimit: Boolean = length >= maxLength

/**
* Appends a string and accumulates its length to allocate a string buffer for all
* appended strings once in the toString method.
* appended strings once in the toString method. Returns true if the string still
* has room for further appends before it hits its max limit.
*/
def append(s: String): Unit = {
if (s != null) {
def append(s: String): Boolean = {
if (!atLimit && s != null) {
strings.append(s)
length += s.length
}
return !atLimit
}

/**
* The method allocates memory for all appended strings, writes them to the memory and
* returns concatenated string.
*/
override def toString: String = {
val result = new java.lang.StringBuilder(length)
strings.foreach(result.append)
val finalLength = Math.min(length, maxLength)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Two things:

  1. You could also avoid the complexity and just log the last string here.
  2. If we go down this path, can you make sure the methods you are using are not making copies for the strings buffer. I am reasonably sure that dropRight does this.

val result = new java.lang.StringBuilder(finalLength)
strings.dropRight(1).foreach(result.append)
strings.lastOption.foreach { s =>
val lastLength = Math.min(s.length, maxLength - result.length())
result.append(s, 0, lastLength)
}
result.toString
}
}

/** Whether we have warned about plan string truncation yet. */
private val planSizeWarningPrinted = new AtomicBoolean(false)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it shared across all instances of PlanStringConcat? I don't think it is good decision.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is shared because the goal is to show the warning only once. (See comment below). If we change that decision we wouldn't need this.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the warning should be printed once per plan but not globally for all plans forever. As I can see, the global variable planSizeWarningPrinted is changed from false to true in PlanStringConcat.toString but never back. So, if a plan reached maxLength, the warning will not appear for other plans.


/** A string concatenator for plan strings. Uses length from a configured value, and
* prints a warning the first time a plan is truncated. */
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why does it important to show warning only once at the first time. How many times do you expect toString is called to particular plan?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was mirroring the behavior used by org.apache.spark.sql.catalyst.util.truncatedString. This does an equivalent kind of truncation if a data frame has too many fields. It only prints the warning out once. It's a similar warning so I copied the existing behavior.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It makes sense for truncatedString because it is called many times for the same plan but in the case of a plan the toString should be called once (maybe of few times)?

class PlanStringConcat extends StringConcat(SQLConf.get.maxPlanStringLength) {
override def toString: String = {
if (atLimit && planSizeWarningPrinted.compareAndSet(false, true)) {
logWarning(
"Truncated the string representation of a plan since it was too long. This " +
s"behavior can be adjusted by setting '${SQLConf.MAX_PLAN_STRING_LENGTH.key}'.")
}
super.toString
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -204,27 +204,6 @@ package object util extends Logging {
truncatedString(seq, "", sep, "", maxFields)
}

/** Whether we have warned about plan string truncation yet. */
private val planSizeWarningPrinted = new AtomicBoolean(false)

def withSizeLimitedWriter[T](writer: Writer)(f: (Writer) => T): Option[T] = {
try {
// Subtract 3 from the string length to leave room for the "..."
val limited = new SizeLimitedWriter(writer, SQLConf.get.maxPlanStringLength - 3)
Some(f(limited))
}
catch {
case e: WriterSizeException =>
writer.write("...")
if (planSizeWarningPrinted.compareAndSet(false, true)) {
logWarning(
"Truncated the string representation of a plan since it was too long. This " +
s"behavior can be adjusted by setting '${SQLConf.MAX_PLAN_STRING_LENGTH.key}'.")
}
None
}
}

/* FIX ME
implicit class debugLogging(a: Any) {
def debugLogging() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1627,12 +1627,11 @@ object SQLConf {

val MAX_PLAN_STRING_LENGTH = buildConf("spark.sql.maxPlanLength")
.doc("Maximum number of characters to output for a plan string. If the plan is " +
"longer, it will end with \"...\" and further output will be truncated. The default " +
"setting always generates a full plan. Set this to a lower value such as 8192 if plan " +
"strings are taking up too much memory or are causing OutOfMemory errors in the driver or " +
"UI processes.")
.longConf
.createWithDefault(Long.MaxValue)
"longer, further output will be truncated. The default setting always generates a full " +
"plan. Set this to a lower value such as 8192 if plan strings are taking up too much " +
"memory or are causing OutOfMemory errors in the driver or UI processes.")
.intConf
.createWithDefault(Int.MaxValue)

val SET_COMMAND_REJECTS_SPARK_CORE_CONFS =
buildConf("spark.sql.legacy.setCommandRejectsSparkCoreConfs")
Expand Down Expand Up @@ -2065,7 +2064,7 @@ class SQLConf extends Serializable with Logging {

def maxToStringFields: Int = getConf(SQLConf.MAX_TO_STRING_FIELDS)

def maxPlanStringLength: Long = getConf(SQLConf.MAX_PLAN_STRING_LENGTH)
def maxPlanStringLength: Int = getConf(SQLConf.MAX_PLAN_STRING_LENGTH)

def setCommandRejectsSparkCoreConfs: Boolean =
getConf(SQLConf.SET_COMMAND_REJECTS_SPARK_CORE_CONFS)
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -56,4 +56,26 @@ class StringUtilsSuite extends SparkFunSuite {
assert(concat("1", "2") == "12")
assert(concat("abc", "\n", "123") == "abc\n123")
}

test("string concatenation with limit") {
def concat(seq: String*): String = {
seq.foldLeft(new StringConcat(7))((acc, s) => {acc.append(s); acc}).toString
}
assert(concat("under") == "under")
assert(concat("under", "over", "extra") == "underov")
assert(concat("underover") == "underov")
assert(concat("under", "ov") == "underov")
}

test("string concatenation return value") {
assert(new StringConcat(7).append("under") == true)
assert(new StringConcat(7).append("underover") == false)
assert(new StringConcat(7).append("underov") == false)
}

test("string concatenation append after limit") {
val concat = new StringConcat(7)
concat.append("underover")
assert(concat.append("extra") == false)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,17 +20,15 @@ package org.apache.spark.sql.execution
import java.io.{BufferedWriter, OutputStreamWriter}

import org.apache.hadoop.fs.Path

import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{AnalysisException, SparkSession}
import org.apache.spark.sql.catalyst.{InternalRow, QueryPlanningTracker}
import org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker
import org.apache.spark.sql.catalyst.plans.QueryPlan
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, ReturnAnswer}
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.catalyst.util.StringUtils.StringConcat
import org.apache.spark.sql.catalyst.util.StringUtils.{PlanStringConcat, StringConcat}
import org.apache.spark.sql.catalyst.util.truncatedString
import org.apache.spark.sql.catalyst.util.withSizeLimitedWriter
import org.apache.spark.sql.execution.exchange.{EnsureRequirements, ReuseExchange}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.util.Utils
Expand Down Expand Up @@ -106,22 +104,14 @@ class QueryExecution(
ReuseSubquery(sparkSession.sessionState.conf))

def simpleString: String = withRedaction {
val concat = new StringConcat()
val concat = new PlanStringConcat()
concat.append("== Physical Plan ==\n")
QueryPlan.append(executedPlan, concat.append, verbose = false, addSuffix = false)
concat.append("\n")
concat.toString
}

// private def writeOrError(writer: Writer)(f: Writer => Unit): Unit = {
// try
// withSizeLimitedWriter(writer)(f)
// catch {
// case e: AnalysisException => writer.write(e.toString)
// }
// }
//
private def writePlans(append: String => Unit, maxFields: Int): Unit = {
private def writePlans(append: String => Boolean, maxFields: Int): Boolean = {
val (verbose, addSuffix) = (true, false)
append("== Parsed Logical Plan ==\n")
QueryPlan.append(logical, append, verbose, addSuffix, maxFields)
Expand All @@ -142,13 +132,13 @@ class QueryExecution(
}

override def toString: String = withRedaction {
val concat = new StringConcat()
val concat = new PlanStringConcat()
writePlans(concat.append, SQLConf.get.maxToStringFields)
concat.toString
}

def stringWithStats: String = withRedaction {
val concat = new StringConcat()
val concat = new PlanStringConcat()
val maxFields = SQLConf.get.maxToStringFields

// trigger to compute stats for logical plans
Expand Down Expand Up @@ -203,9 +193,12 @@ class QueryExecution(
val filePath = new Path(path)
val fs = filePath.getFileSystem(sparkSession.sessionState.newHadoopConf())
val writer = new BufferedWriter(new OutputStreamWriter(fs.create(filePath)))

val append = (s: String) => {
writer.write(s)
true
}
try {
writePlans(writer.write, maxFields)
writePlans(append, maxFields)
writer.write("\n== Whole Stage Codegen ==\n")
org.apache.spark.sql.execution.debug.writeCodegen(writer.write, executedPlan)
} finally {
Expand Down
Loading