From de38cf895f12f4433b10702fab12efaa179dce27 Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Sat, 29 Dec 2018 12:42:41 +0100 Subject: [PATCH 01/13] Use append function instead of Writer --- .../spark/sql/catalyst/trees/TreeNode.scala | 26 +++++++-------- .../spark/sql/execution/QueryExecution.scala | 32 +++++++++---------- .../sql/execution/WholeStageCodegenExec.scala | 8 ++--- 3 files changed, 33 insertions(+), 33 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala index 21e59bbd283e..d46b8b71e744 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala @@ -483,7 +483,7 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]] extends Product { maxFields: Int = SQLConf.get.maxToStringFields): String = { val writer = new StringBuilderWriter() try { - treeString(writer, verbose, addSuffix, maxFields) + treeString(writer.write, verbose, addSuffix, maxFields) writer.toString } finally { writer.close() @@ -491,11 +491,11 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]] extends Product { } def treeString( - writer: Writer, + append: String => Unit, verbose: Boolean, addSuffix: Boolean, maxFields: Int): Unit = { - generateTreeString(0, Nil, writer, verbose, "", addSuffix, maxFields) + generateTreeString(0, Nil, append, verbose, "", addSuffix, maxFields) } /** @@ -558,7 +558,7 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]] extends Product { def generateTreeString( depth: Int, lastChildren: Seq[Boolean], - writer: Writer, + append: String => Unit, verbose: Boolean, prefix: String = "", addSuffix: Boolean = false, @@ -566,9 +566,9 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]] extends Product { if (depth > 0) { lastChildren.init.foreach { isLast => - writer.write(if (isLast) " " else ": ") + append(if (isLast) " " else ": ") } - writer.write(if (lastChildren.last) "+- " else ":- ") + append(if (lastChildren.last) "+- " else ":- ") } val str = if (verbose) { @@ -576,24 +576,24 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]] extends Product { } else { simpleString(maxFields) } - writer.write(prefix) - writer.write(str) - writer.write("\n") + append(prefix) + append(str) + append("\n") if (innerChildren.nonEmpty) { innerChildren.init.foreach(_.generateTreeString( - depth + 2, lastChildren :+ children.isEmpty :+ false, writer, verbose, + depth + 2, lastChildren :+ children.isEmpty :+ false, append, verbose, addSuffix = addSuffix, maxFields = maxFields)) innerChildren.last.generateTreeString( - depth + 2, lastChildren :+ children.isEmpty :+ true, writer, verbose, + depth + 2, lastChildren :+ children.isEmpty :+ true, append, verbose, addSuffix = addSuffix, maxFields = maxFields) } if (children.nonEmpty) { children.init.foreach(_.generateTreeString( - depth + 1, lastChildren :+ false, writer, verbose, prefix, addSuffix, maxFields)) + depth + 1, lastChildren :+ false, append, verbose, prefix, addSuffix, maxFields)) children.last.generateTreeString( - depth + 1, lastChildren :+ true, writer, verbose, prefix, addSuffix, maxFields) + depth + 1, lastChildren :+ true, append, verbose, prefix, addSuffix, maxFields) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala index 9b8d2e830867..b75ac09fa54c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala @@ -202,34 +202,34 @@ class QueryExecution( """.stripMargin.trim } - private def writeOrError(writer: Writer)(f: Writer => Unit): Unit = { - try f(writer) + private def appendOrError(append: String => Unit)(f: (String => Unit) => Unit): Unit = { + try f(append) catch { - case e: AnalysisException => writer.write(e.toString) + case e: AnalysisException => append(e.toString) } } - private def writePlans(writer: Writer, maxFields: Int): Unit = { + private def writePlans(append: String => Unit, maxFields: Int): Unit = { val (verbose, addSuffix) = (true, false) - writer.write("== Parsed Logical Plan ==\n") - writeOrError(writer)(logical.treeString(_, verbose, addSuffix, maxFields)) - writer.write("\n== Analyzed Logical Plan ==\n") + append("== Parsed Logical Plan ==\n") + appendOrError(append)(logical.treeString(_, verbose, addSuffix, maxFields)) + append("\n== Analyzed Logical Plan ==\n") val analyzedOutput = stringOrError(truncatedString( analyzed.output.map(o => s"${o.name}: ${o.dataType.simpleString}"), ", ", maxFields)) - writer.write(analyzedOutput) - writer.write("\n") - writeOrError(writer)(analyzed.treeString(_, verbose, addSuffix, maxFields)) - writer.write("\n== Optimized Logical Plan ==\n") - writeOrError(writer)(optimizedPlan.treeString(_, verbose, addSuffix, maxFields)) - writer.write("\n== Physical Plan ==\n") - writeOrError(writer)(executedPlan.treeString(_, verbose, addSuffix, maxFields)) + append(analyzedOutput) + append("\n") + appendOrError(append)(analyzed.treeString(_, verbose, addSuffix, maxFields)) + append("\n== Optimized Logical Plan ==\n") + appendOrError(append)(optimizedPlan.treeString(_, verbose, addSuffix, maxFields)) + append("\n== Physical Plan ==\n") + appendOrError(append)(executedPlan.treeString(_, verbose, addSuffix, maxFields)) } override def toString: String = withRedaction { val writer = new StringBuilderWriter() try { - writePlans(writer, SQLConf.get.maxToStringFields) + writePlans(writer.write, SQLConf.get.maxToStringFields) writer.toString } finally { writer.close() @@ -290,7 +290,7 @@ class QueryExecution( val writer = new BufferedWriter(new OutputStreamWriter(fs.create(filePath))) try { - writePlans(writer, maxFields) + writePlans(writer.write, maxFields) writer.write("\n== Whole Stage Codegen ==\n") org.apache.spark.sql.execution.debug.writeCodegen(writer, executedPlan) } finally { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala index f4927dedabe5..3b0a99669ccd 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala @@ -493,7 +493,7 @@ case class InputAdapter(child: SparkPlan) extends UnaryExecNode with InputRDDCod override def generateTreeString( depth: Int, lastChildren: Seq[Boolean], - writer: Writer, + append: String => Unit, verbose: Boolean, prefix: String = "", addSuffix: Boolean = false, @@ -501,7 +501,7 @@ case class InputAdapter(child: SparkPlan) extends UnaryExecNode with InputRDDCod child.generateTreeString( depth, lastChildren, - writer, + append, verbose, prefix = "", addSuffix = false, @@ -777,7 +777,7 @@ case class WholeStageCodegenExec(child: SparkPlan)(val codegenStageId: Int) override def generateTreeString( depth: Int, lastChildren: Seq[Boolean], - writer: Writer, + append: String => Unit, verbose: Boolean, prefix: String = "", addSuffix: Boolean = false, @@ -785,7 +785,7 @@ case class WholeStageCodegenExec(child: SparkPlan)(val codegenStageId: Int) child.generateTreeString( depth, lastChildren, - writer, + append, verbose, s"*($codegenStageId) ", false, From af5c9ed389f4d6caaba49969e48d966d9d976831 Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Sat, 29 Dec 2018 12:43:52 +0100 Subject: [PATCH 02/13] Fix typo --- .../scala/org/apache/spark/sql/execution/QueryExecution.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala index b75ac09fa54c..5ab6312b7e13 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala @@ -282,7 +282,7 @@ class QueryExecution( /** * Dumps debug information about query execution into the specified file. * - * @param maxFields maximim number of fields converted to string representation. + * @param maxFields maximum number of fields converted to string representation. */ def toFile(path: String, maxFields: Int = Int.MaxValue): Unit = { val filePath = new Path(path) From 0a36a26a8bdbace64be8260189475014cb1c0e27 Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Sat, 29 Dec 2018 13:20:10 +0100 Subject: [PATCH 03/13] StringRope on List --- .../spark/sql/catalyst/util/StringUtils.scala | 22 +++++++++++++++++++ 1 file changed, 22 insertions(+) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/StringUtils.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/StringUtils.scala index bc861a805ce6..9912fa994294 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/StringUtils.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/StringUtils.scala @@ -87,4 +87,26 @@ object StringUtils { } funcNames.toSeq } + + class StringRope { + private var list = List.empty[String] + private var length: Int = 0 + + def append(s: String): Unit = { + list = s :: list + length += s.length + } + + override def toString: String = { + val buffer = new StringBuffer(length) + var reversed = list.reverse + + while (!reversed.isEmpty) { + buffer.append(reversed.head) + reversed = reversed.tail + } + + buffer.toString + } + } } From 07ae72d952a239aef1b58e0ae3e1028292de293e Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Sat, 29 Dec 2018 13:39:17 +0100 Subject: [PATCH 04/13] Tests for StringRope --- .../spark/sql/catalyst/util/StringUtils.scala | 6 ++++-- .../spark/sql/catalyst/util/StringUtilsSuite.scala | 13 +++++++++++++ 2 files changed, 17 insertions(+), 2 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/StringUtils.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/StringUtils.scala index 9912fa994294..1a0cc4d80f6e 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/StringUtils.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/StringUtils.scala @@ -93,8 +93,10 @@ object StringUtils { private var length: Int = 0 def append(s: String): Unit = { - list = s :: list - length += s.length + if (s != null) { + list = s :: list + length += s.length + } } override def toString: String = { diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/StringUtilsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/StringUtilsSuite.scala index 78fee5135c3a..42be9980e95a 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/StringUtilsSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/StringUtilsSuite.scala @@ -43,4 +43,17 @@ class StringUtilsSuite extends SparkFunSuite { assert(filterPattern(names, " a. ") === Seq("a1", "a2")) assert(filterPattern(names, " d* ") === Nil) } + + test("string rope") { + def toRope(seq: String*): String = { + seq.foldLeft(new StringRope())((rope, s) => {rope.append(s); rope}).toString + } + + assert(new StringRope().toString == "") + assert(toRope("") == "") + assert(toRope(null) == "") + assert(toRope("a") == "a") + assert(toRope("1", "2") == "12") + assert(toRope("abc", "\n", "123") == "abc\n123") + } } From f621de72f6f3e623c6660e50302f2e6abb5557d7 Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Sat, 29 Dec 2018 13:54:16 +0100 Subject: [PATCH 05/13] Porting on StringRope --- .../spark/sql/catalyst/trees/TreeNode.scala | 14 ++++------ .../spark/sql/execution/QueryExecution.scala | 17 +++++------- .../spark/sql/execution/debug/package.scala | 26 +++++++------------ 3 files changed, 22 insertions(+), 35 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala index d46b8b71e744..202d1b527de6 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala @@ -17,13 +17,11 @@ package org.apache.spark.sql.catalyst.trees -import java.io.Writer import java.util.UUID import scala.collection.Map import scala.reflect.ClassTag -import org.apache.commons.io.output.StringBuilderWriter import org.apache.commons.lang3.ClassUtils import org.json4s.JsonAST._ import org.json4s.JsonDSL._ @@ -37,6 +35,7 @@ import org.apache.spark.sql.catalyst.errors._ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans.JoinType import org.apache.spark.sql.catalyst.plans.physical.{BroadcastMode, Partitioning} +import org.apache.spark.sql.catalyst.util.StringUtils.StringRope import org.apache.spark.sql.catalyst.util.truncatedString import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ @@ -481,13 +480,10 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]] extends Product { verbose: Boolean, addSuffix: Boolean = false, maxFields: Int = SQLConf.get.maxToStringFields): String = { - val writer = new StringBuilderWriter() - try { - treeString(writer.write, verbose, addSuffix, maxFields) - writer.toString - } finally { - writer.close() - } + val rope = new StringRope() + + treeString(rope.append, verbose, addSuffix, maxFields) + rope.toString } def treeString( diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala index 5ab6312b7e13..c842efa0a0d0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala @@ -17,11 +17,10 @@ package org.apache.spark.sql.execution -import java.io.{BufferedWriter, OutputStreamWriter, Writer} +import java.io.{BufferedWriter, OutputStreamWriter} import java.nio.charset.StandardCharsets import java.sql.{Date, Timestamp} -import org.apache.commons.io.output.StringBuilderWriter import org.apache.hadoop.fs.Path import org.apache.spark.rdd.RDD @@ -31,6 +30,7 @@ import org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, ReturnAnswer} import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.catalyst.util.DateTimeUtils +import org.apache.spark.sql.catalyst.util.StringUtils.StringRope import org.apache.spark.sql.catalyst.util.truncatedString import org.apache.spark.sql.execution.command.{DescribeTableCommand, ExecutedCommandExec, ShowTablesCommand} import org.apache.spark.sql.execution.exchange.{EnsureRequirements, ReuseExchange} @@ -227,13 +227,10 @@ class QueryExecution( } override def toString: String = withRedaction { - val writer = new StringBuilderWriter() - try { - writePlans(writer.write, SQLConf.get.maxToStringFields) - writer.toString - } finally { - writer.close() - } + val rope = new StringRope() + + writePlans(rope.append, SQLConf.get.maxToStringFields) + rope.toString } def stringWithStats: String = withRedaction { @@ -292,7 +289,7 @@ class QueryExecution( try { writePlans(writer.write, maxFields) writer.write("\n== Whole Stage Codegen ==\n") - org.apache.spark.sql.execution.debug.writeCodegen(writer, executedPlan) + org.apache.spark.sql.execution.debug.writeCodegen(writer.write, executedPlan) } finally { writer.close() } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala index ae8197f617a2..d52d889318cf 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala @@ -17,13 +17,10 @@ package org.apache.spark.sql.execution -import java.io.Writer import java.util.Collections import scala.collection.JavaConverters._ -import org.apache.commons.io.output.StringBuilderWriter - import org.apache.spark.internal.Logging import org.apache.spark.rdd.RDD import org.apache.spark.sql._ @@ -32,6 +29,7 @@ import org.apache.spark.sql.catalyst.expressions.Attribute import org.apache.spark.sql.catalyst.expressions.codegen.{CodeFormatter, CodegenContext, ExprCode} import org.apache.spark.sql.catalyst.plans.physical.Partitioning import org.apache.spark.sql.catalyst.trees.TreeNodeRef +import org.apache.spark.sql.catalyst.util.StringUtils.StringRope import org.apache.spark.sql.execution.streaming.{StreamExecution, StreamingQueryWrapper} import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.streaming.StreamingQuery @@ -73,24 +71,20 @@ package object debug { * @return single String containing all WholeStageCodegen subtrees and corresponding codegen */ def codegenString(plan: SparkPlan): String = { - val writer = new StringBuilderWriter() + val rope = new StringRope() - try { - writeCodegen(writer, plan) - writer.toString - } finally { - writer.close() - } + writeCodegen(rope.append, plan) + rope.toString } - def writeCodegen(writer: Writer, plan: SparkPlan): Unit = { + def writeCodegen(append: String => Unit, plan: SparkPlan): Unit = { val codegenSeq = codegenStringSeq(plan) - writer.write(s"Found ${codegenSeq.size} WholeStageCodegen subtrees.\n") + append(s"Found ${codegenSeq.size} WholeStageCodegen subtrees.\n") for (((subtree, code), i) <- codegenSeq.zipWithIndex) { - writer.write(s"== Subtree ${i + 1} / ${codegenSeq.size} ==\n") - writer.write(subtree) - writer.write("\nGenerated code:\n") - writer.write(s"${code}\n") + append(s"== Subtree ${i + 1} / ${codegenSeq.size} ==\n") + append(subtree) + append("\nGenerated code:\n") + append(s"${code}\n") } } From ca4aed8f7ef02184afd5e3ef057b6babde4274dc Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Sat, 29 Dec 2018 18:17:11 +0100 Subject: [PATCH 06/13] Rope other methods --- .../spark/sql/execution/QueryExecution.scala | 33 ++++++++++++------- 1 file changed, 21 insertions(+), 12 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala index c842efa0a0d0..8d1c7f60db6f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala @@ -108,10 +108,6 @@ class QueryExecution( ReuseExchange(sparkSession.sessionState.conf), ReuseSubquery(sparkSession.sessionState.conf)) - protected def stringOrError[A](f: => A): String = - try f.toString catch { case e: AnalysisException => e.toString } - - /** * Returns the result as a hive compatible sequence of strings. This is used in tests and * `SparkSQLDriver` for CLI applications. @@ -197,9 +193,14 @@ class QueryExecution( } def simpleString: String = withRedaction { - s"""== Physical Plan == - |${stringOrError(executedPlan.treeString(verbose = false))} - """.stripMargin.trim + val rope = new StringRope() + + rope.append("== Physical Plan ==\n") + appendOrError(rope.append)( + executedPlan.treeString(_, false, false, SQLConf.get.maxToStringFields)) + rope.append("\n") + + rope.toString } private def appendOrError(append: String => Unit)(f: (String => Unit) => Unit): Unit = { @@ -210,6 +211,9 @@ class QueryExecution( } private def writePlans(append: String => Unit, maxFields: Int): Unit = { + def stringOrError[A](f: => A): String = { + try f.toString catch { case e: AnalysisException => e.toString } + } val (verbose, addSuffix) = (true, false) append("== Parsed Logical Plan ==\n") @@ -234,15 +238,20 @@ class QueryExecution( } def stringWithStats: String = withRedaction { + val rope = new StringRope() + val maxFields = SQLConf.get.maxToStringFields + // trigger to compute stats for logical plans optimizedPlan.stats // only show optimized logical plan and physical plan - s"""== Optimized Logical Plan == - |${stringOrError(optimizedPlan.treeString(verbose = true, addSuffix = true))} - |== Physical Plan == - |${stringOrError(executedPlan.treeString(verbose = true))} - """.stripMargin.trim + rope.append("== Optimized Logical Plan ==\n") + appendOrError(rope.append)(optimizedPlan.treeString(_, true, true, maxFields)) + rope.append("\n== Physical Plan ==\n") + appendOrError(rope.append)(executedPlan.treeString(_, true, false, maxFields)) + rope.append("\n") + + rope.toString } /** From d727959bc6602e6313ace85ca8bbd50da121ca1b Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Sun, 30 Dec 2018 12:29:12 +0100 Subject: [PATCH 07/13] Added docs --- .../apache/spark/sql/catalyst/util/StringUtils.scala | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/StringUtils.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/StringUtils.scala index 1a0cc4d80f6e..deeb3d4c75b6 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/StringUtils.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/StringUtils.scala @@ -88,10 +88,18 @@ object StringUtils { funcNames.toSeq } + /** + * Concatenation of sequence of strings to final string with cheap append method + * and one memory allocation for the final string. + */ class StringRope { private var list = List.empty[String] private var length: Int = 0 + /** + * Appends a string and accumulates its length to allocate a string buffer for all + * appended strings once in the toString method. + */ def append(s: String): Unit = { if (s != null) { list = s :: list @@ -99,6 +107,10 @@ object StringUtils { } } + /** + * The method allocates memory for all appended strings, writes them to the memory and + * returns concatenated string. + */ override def toString: String = { val buffer = new StringBuffer(length) var reversed = list.reverse From 544b80e0f9da6035635c2f2541930d8a5ebacbe9 Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Sun, 30 Dec 2018 12:42:20 +0100 Subject: [PATCH 08/13] Replacing List by ArrayBuffer --- .../spark/sql/catalyst/util/StringUtils.scala | 13 +++++-------- 1 file changed, 5 insertions(+), 8 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/StringUtils.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/StringUtils.scala index deeb3d4c75b6..f87200d1b438 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/StringUtils.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/StringUtils.scala @@ -19,6 +19,8 @@ package org.apache.spark.sql.catalyst.util import java.util.regex.{Pattern, PatternSyntaxException} +import scala.collection.mutable.ArrayBuffer + import org.apache.spark.sql.AnalysisException import org.apache.spark.unsafe.types.UTF8String @@ -93,7 +95,7 @@ object StringUtils { * and one memory allocation for the final string. */ class StringRope { - private var list = List.empty[String] + private val rope = new ArrayBuffer[String] private var length: Int = 0 /** @@ -102,7 +104,7 @@ object StringUtils { */ def append(s: String): Unit = { if (s != null) { - list = s :: list + rope.append(s) length += s.length } } @@ -113,13 +115,8 @@ object StringUtils { */ override def toString: String = { val buffer = new StringBuffer(length) - var reversed = list.reverse - - while (!reversed.isEmpty) { - buffer.append(reversed.head) - reversed = reversed.tail - } + rope.foreach(buffer.append) buffer.toString } } From 059dcf493eba038a5df88a29ab7b6a837cdc07f0 Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Sun, 30 Dec 2018 13:03:45 +0100 Subject: [PATCH 09/13] Inline stringOrError --- .../apache/spark/sql/execution/QueryExecution.scala | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala index 8d1c7f60db6f..35153cd72ef7 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala @@ -211,16 +211,17 @@ class QueryExecution( } private def writePlans(append: String => Unit, maxFields: Int): Unit = { - def stringOrError[A](f: => A): String = { - try f.toString catch { case e: AnalysisException => e.toString } - } val (verbose, addSuffix) = (true, false) append("== Parsed Logical Plan ==\n") appendOrError(append)(logical.treeString(_, verbose, addSuffix, maxFields)) append("\n== Analyzed Logical Plan ==\n") - val analyzedOutput = stringOrError(truncatedString( - analyzed.output.map(o => s"${o.name}: ${o.dataType.simpleString}"), ", ", maxFields)) + val analyzedOutput = try { + truncatedString( + analyzed.output.map(o => s"${o.name}: ${o.dataType.simpleString}"), ", ", maxFields) + } catch { + case e: AnalysisException => e.toString + } append(analyzedOutput) append("\n") appendOrError(append)(analyzed.treeString(_, verbose, addSuffix, maxFields)) From fe3bbcf9e56e3436bd16541962a25b5c047da684 Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Sun, 30 Dec 2018 22:26:47 +0100 Subject: [PATCH 10/13] Move appendOrError to QueryPlan --- .../spark/sql/catalyst/plans/QueryPlan.scala | 17 +++++++++++ .../spark/sql/execution/QueryExecution.scala | 28 ++++++------------- 2 files changed, 25 insertions(+), 20 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala index 125181fb213f..8f5444ed8a5a 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala @@ -17,6 +17,7 @@ package org.apache.spark.sql.catalyst.plans +import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.trees.{CurrentOrigin, TreeNode} import org.apache.spark.sql.internal.SQLConf @@ -301,4 +302,20 @@ object QueryPlan extends PredicateHelper { Nil } } + + /** + * Converts the query plan to string and appends it via provided function. + */ + def append[T <: QueryPlan[T]]( + plan: => QueryPlan[T], + append: String => Unit, + verbose: Boolean, + addSuffix: Boolean, + maxFields: Int = SQLConf.get.maxToStringFields): Unit = { + try { + plan.treeString(append, verbose, addSuffix, maxFields) + } catch { + case e: AnalysisException => append(e.toString) + } + } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala index 35153cd72ef7..273f8a7dda4e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala @@ -27,6 +27,7 @@ import org.apache.spark.rdd.RDD import org.apache.spark.sql.{AnalysisException, Row, SparkSession} import org.apache.spark.sql.catalyst.{InternalRow, QueryPlanningTracker} import org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker +import org.apache.spark.sql.catalyst.plans.QueryPlan import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, ReturnAnswer} import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.catalyst.util.DateTimeUtils @@ -194,27 +195,16 @@ class QueryExecution( def simpleString: String = withRedaction { val rope = new StringRope() - rope.append("== Physical Plan ==\n") - appendOrError(rope.append)( - executedPlan.treeString(_, false, false, SQLConf.get.maxToStringFields)) + QueryPlan.append(executedPlan, rope.append, verbose = false, addSuffix = false) rope.append("\n") - rope.toString } - private def appendOrError(append: String => Unit)(f: (String => Unit) => Unit): Unit = { - try f(append) - catch { - case e: AnalysisException => append(e.toString) - } - } - private def writePlans(append: String => Unit, maxFields: Int): Unit = { val (verbose, addSuffix) = (true, false) - append("== Parsed Logical Plan ==\n") - appendOrError(append)(logical.treeString(_, verbose, addSuffix, maxFields)) + QueryPlan.append(logical, append, verbose, addSuffix, maxFields) append("\n== Analyzed Logical Plan ==\n") val analyzedOutput = try { truncatedString( @@ -224,16 +214,15 @@ class QueryExecution( } append(analyzedOutput) append("\n") - appendOrError(append)(analyzed.treeString(_, verbose, addSuffix, maxFields)) + QueryPlan.append(analyzed, append, verbose, addSuffix, maxFields) append("\n== Optimized Logical Plan ==\n") - appendOrError(append)(optimizedPlan.treeString(_, verbose, addSuffix, maxFields)) + QueryPlan.append(optimizedPlan, append, verbose, addSuffix, maxFields) append("\n== Physical Plan ==\n") - appendOrError(append)(executedPlan.treeString(_, verbose, addSuffix, maxFields)) + QueryPlan.append(executedPlan, append, verbose, addSuffix, maxFields) } override def toString: String = withRedaction { val rope = new StringRope() - writePlans(rope.append, SQLConf.get.maxToStringFields) rope.toString } @@ -247,11 +236,10 @@ class QueryExecution( // only show optimized logical plan and physical plan rope.append("== Optimized Logical Plan ==\n") - appendOrError(rope.append)(optimizedPlan.treeString(_, true, true, maxFields)) + QueryPlan.append(optimizedPlan, rope.append, verbose = true, addSuffix = true, maxFields) rope.append("\n== Physical Plan ==\n") - appendOrError(rope.append)(executedPlan.treeString(_, true, false, maxFields)) + QueryPlan.append(executedPlan, rope.append, verbose = true, addSuffix = false, maxFields) rope.append("\n") - rope.toString } From 707c80b97cd888b84afbfac4752e937ba529132f Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Sun, 30 Dec 2018 22:33:50 +0100 Subject: [PATCH 11/13] Renaming StringRope to StringConcat --- .../spark/sql/catalyst/trees/TreeNode.scala | 8 ++--- .../spark/sql/catalyst/util/StringUtils.scala | 13 ++++---- .../sql/catalyst/util/StringUtilsSuite.scala | 4 +-- .../spark/sql/execution/QueryExecution.scala | 32 +++++++++---------- .../spark/sql/execution/debug/package.scala | 9 +++--- 5 files changed, 32 insertions(+), 34 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala index 202d1b527de6..570a019b2af7 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala @@ -35,7 +35,7 @@ import org.apache.spark.sql.catalyst.errors._ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans.JoinType import org.apache.spark.sql.catalyst.plans.physical.{BroadcastMode, Partitioning} -import org.apache.spark.sql.catalyst.util.StringUtils.StringRope +import org.apache.spark.sql.catalyst.util.StringUtils.StringConcat import org.apache.spark.sql.catalyst.util.truncatedString import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ @@ -480,10 +480,10 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]] extends Product { verbose: Boolean, addSuffix: Boolean = false, maxFields: Int = SQLConf.get.maxToStringFields): String = { - val rope = new StringRope() + val concat = new StringConcat() - treeString(rope.append, verbose, addSuffix, maxFields) - rope.toString + treeString(concat.append, verbose, addSuffix, maxFields) + concat.toString } def treeString( diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/StringUtils.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/StringUtils.scala index f87200d1b438..50476001656e 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/StringUtils.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/StringUtils.scala @@ -94,8 +94,8 @@ object StringUtils { * Concatenation of sequence of strings to final string with cheap append method * and one memory allocation for the final string. */ - class StringRope { - private val rope = new ArrayBuffer[String] + class StringConcat { + private val strings = new ArrayBuffer[String] private var length: Int = 0 /** @@ -104,7 +104,7 @@ object StringUtils { */ def append(s: String): Unit = { if (s != null) { - rope.append(s) + strings.append(s) length += s.length } } @@ -114,10 +114,9 @@ object StringUtils { * returns concatenated string. */ override def toString: String = { - val buffer = new StringBuffer(length) - - rope.foreach(buffer.append) - buffer.toString + val result = new StringBuffer(length) + strings.foreach(result.append) + result.toString } } } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/StringUtilsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/StringUtilsSuite.scala index 42be9980e95a..411a64c125e5 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/StringUtilsSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/StringUtilsSuite.scala @@ -46,10 +46,10 @@ class StringUtilsSuite extends SparkFunSuite { test("string rope") { def toRope(seq: String*): String = { - seq.foldLeft(new StringRope())((rope, s) => {rope.append(s); rope}).toString + seq.foldLeft(new StringConcat())((rope, s) => {rope.append(s); rope}).toString } - assert(new StringRope().toString == "") + assert(new StringConcat().toString == "") assert(toRope("") == "") assert(toRope(null) == "") assert(toRope("a") == "a") diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala index 273f8a7dda4e..7fccbf65d852 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala @@ -31,7 +31,7 @@ import org.apache.spark.sql.catalyst.plans.QueryPlan import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, ReturnAnswer} import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.catalyst.util.DateTimeUtils -import org.apache.spark.sql.catalyst.util.StringUtils.StringRope +import org.apache.spark.sql.catalyst.util.StringUtils.StringConcat import org.apache.spark.sql.catalyst.util.truncatedString import org.apache.spark.sql.execution.command.{DescribeTableCommand, ExecutedCommandExec, ShowTablesCommand} import org.apache.spark.sql.execution.exchange.{EnsureRequirements, ReuseExchange} @@ -194,11 +194,11 @@ class QueryExecution( } def simpleString: String = withRedaction { - val rope = new StringRope() - rope.append("== Physical Plan ==\n") - QueryPlan.append(executedPlan, rope.append, verbose = false, addSuffix = false) - rope.append("\n") - rope.toString + val concat = new StringConcat() + concat.append("== Physical Plan ==\n") + QueryPlan.append(executedPlan, concat.append, verbose = false, addSuffix = false) + concat.append("\n") + concat.toString } private def writePlans(append: String => Unit, maxFields: Int): Unit = { @@ -222,25 +222,25 @@ class QueryExecution( } override def toString: String = withRedaction { - val rope = new StringRope() - writePlans(rope.append, SQLConf.get.maxToStringFields) - rope.toString + val concat = new StringConcat() + writePlans(concat.append, SQLConf.get.maxToStringFields) + concat.toString } def stringWithStats: String = withRedaction { - val rope = new StringRope() + val concat = new StringConcat() val maxFields = SQLConf.get.maxToStringFields // trigger to compute stats for logical plans optimizedPlan.stats // only show optimized logical plan and physical plan - rope.append("== Optimized Logical Plan ==\n") - QueryPlan.append(optimizedPlan, rope.append, verbose = true, addSuffix = true, maxFields) - rope.append("\n== Physical Plan ==\n") - QueryPlan.append(executedPlan, rope.append, verbose = true, addSuffix = false, maxFields) - rope.append("\n") - rope.toString + concat.append("== Optimized Logical Plan ==\n") + QueryPlan.append(optimizedPlan, concat.append, verbose = true, addSuffix = true, maxFields) + concat.append("\n== Physical Plan ==\n") + QueryPlan.append(executedPlan, concat.append, verbose = true, addSuffix = false, maxFields) + concat.append("\n") + concat.toString } /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala index d52d889318cf..53b74c7c8559 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala @@ -29,7 +29,7 @@ import org.apache.spark.sql.catalyst.expressions.Attribute import org.apache.spark.sql.catalyst.expressions.codegen.{CodeFormatter, CodegenContext, ExprCode} import org.apache.spark.sql.catalyst.plans.physical.Partitioning import org.apache.spark.sql.catalyst.trees.TreeNodeRef -import org.apache.spark.sql.catalyst.util.StringUtils.StringRope +import org.apache.spark.sql.catalyst.util.StringUtils.StringConcat import org.apache.spark.sql.execution.streaming.{StreamExecution, StreamingQueryWrapper} import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.streaming.StreamingQuery @@ -71,10 +71,9 @@ package object debug { * @return single String containing all WholeStageCodegen subtrees and corresponding codegen */ def codegenString(plan: SparkPlan): String = { - val rope = new StringRope() - - writeCodegen(rope.append, plan) - rope.toString + val concat = new StringConcat() + writeCodegen(concat.append, plan) + concat.toString } def writeCodegen(append: String => Unit, plan: SparkPlan): Unit = { From 074e9b87716cb3c59ef3a4cb59ba39e070886c55 Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Sun, 30 Dec 2018 22:36:57 +0100 Subject: [PATCH 12/13] rope -> concat in tests --- .../sql/catalyst/util/StringUtilsSuite.scala | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/StringUtilsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/StringUtilsSuite.scala index 411a64c125e5..616ec12032db 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/StringUtilsSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/StringUtilsSuite.scala @@ -44,16 +44,16 @@ class StringUtilsSuite extends SparkFunSuite { assert(filterPattern(names, " d* ") === Nil) } - test("string rope") { - def toRope(seq: String*): String = { - seq.foldLeft(new StringConcat())((rope, s) => {rope.append(s); rope}).toString + test("string concatenation") { + def concat(seq: String*): String = { + seq.foldLeft(new StringConcat())((acc, s) => {acc.append(s); acc}).toString } assert(new StringConcat().toString == "") - assert(toRope("") == "") - assert(toRope(null) == "") - assert(toRope("a") == "a") - assert(toRope("1", "2") == "12") - assert(toRope("abc", "\n", "123") == "abc\n123") + assert(concat("") == "") + assert(concat(null) == "") + assert(concat("a") == "a") + assert(concat("1", "2") == "12") + assert(concat("abc", "\n", "123") == "abc\n123") } } From 29b62bf4365bfadd77e4c44b718124b6764471e0 Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Mon, 31 Dec 2018 10:31:47 +0100 Subject: [PATCH 13/13] StringBuffer -> StringBuilder --- .../scala/org/apache/spark/sql/catalyst/util/StringUtils.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/StringUtils.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/StringUtils.scala index 50476001656e..643b83b1741a 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/StringUtils.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/StringUtils.scala @@ -114,7 +114,7 @@ object StringUtils { * returns concatenated string. */ override def toString: String = { - val result = new StringBuffer(length) + val result = new java.lang.StringBuilder(length) strings.foreach(result.append) result.toString }