Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 0 additions & 48 deletions core/src/main/scala/org/apache/spark/util/Utils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ import java.security.SecureRandom
import java.util.{Locale, Properties, Random, UUID}
import java.util.concurrent._
import java.util.concurrent.TimeUnit.NANOSECONDS
import java.util.concurrent.atomic.AtomicBoolean
import java.util.zip.GZIPInputStream

import scala.annotation.tailrec
Expand Down Expand Up @@ -93,53 +92,6 @@ private[spark] object Utils extends Logging {
private val MAX_DIR_CREATION_ATTEMPTS: Int = 10
@volatile private var localRootDirs: Array[String] = null

/**
* The performance overhead of creating and logging strings for wide schemas can be large. To
* limit the impact, we bound the number of fields to include by default. This can be overridden
* by setting the 'spark.debug.maxToStringFields' conf in SparkEnv.
*/
val DEFAULT_MAX_TO_STRING_FIELDS = 25

private[spark] def maxNumToStringFields = {
if (SparkEnv.get != null) {
SparkEnv.get.conf.getInt("spark.debug.maxToStringFields", DEFAULT_MAX_TO_STRING_FIELDS)
} else {
DEFAULT_MAX_TO_STRING_FIELDS
}
}

/** Whether we have warned about plan string truncation yet. */
private val truncationWarningPrinted = new AtomicBoolean(false)

/**
* Format a sequence with semantics similar to calling .mkString(). Any elements beyond
* maxNumToStringFields will be dropped and replaced by a "... N more fields" placeholder.
*
* @return the trimmed and formatted string.
*/
def truncatedString[T](
seq: Seq[T],
start: String,
sep: String,
end: String,
maxNumFields: Int = maxNumToStringFields): String = {
if (seq.length > maxNumFields) {
if (truncationWarningPrinted.compareAndSet(false, true)) {
logWarning(
"Truncated the string representation of a plan since it was too large. This " +
"behavior can be adjusted by setting 'spark.debug.maxToStringFields' in SparkEnv.conf.")
}
val numFields = math.max(0, maxNumFields - 1)
seq.take(numFields).mkString(
start, sep, sep + "... " + (seq.length - numFields) + " more fields" + end)
} else {
seq.mkString(start, sep, end)
}
}

/** Shorthand for calling truncatedString() without start or end strings. */
def truncatedString[T](seq: Seq[T], sep: String): String = truncatedString(seq, "", sep, "")

/** Serialize an object using Java serialization */
def serialize[T](o: T): Array[Byte] = {
val bos = new ByteArrayOutputStream()
Expand Down
8 changes: 0 additions & 8 deletions core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -45,14 +45,6 @@ import org.apache.spark.scheduler.SparkListener

class UtilsSuite extends SparkFunSuite with ResetSystemProperties with Logging {

test("truncatedString") {
assert(Utils.truncatedString(Nil, "[", ", ", "]", 2) == "[]")
assert(Utils.truncatedString(Seq(1, 2), "[", ", ", "]", 2) == "[1, 2]")
assert(Utils.truncatedString(Seq(1, 2, 3), "[", ", ", "]", 2) == "[1, ... 2 more fields]")
assert(Utils.truncatedString(Seq(1, 2, 3), "[", ", ", "]", -5) == "[, ... 3 more fields]")
assert(Utils.truncatedString(Seq(1, 2, 3), ", ") == "1, 2, 3")
}

test("timeConversion") {
// Test -1
assert(Utils.timeStringAsSeconds("-1") === -1)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,9 @@ import org.apache.spark.sql.catalyst.expressions.aggregate.DeclarativeAggregate
import org.apache.spark.sql.catalyst.expressions.codegen._
import org.apache.spark.sql.catalyst.expressions.codegen.Block._
import org.apache.spark.sql.catalyst.trees.TreeNode
import org.apache.spark.sql.catalyst.util.truncatedString
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._
import org.apache.spark.util.Utils

////////////////////////////////////////////////////////////////////////////////////////////////////
// This file defines the basic expression abstract classes in Catalyst.
Expand Down Expand Up @@ -237,7 +237,7 @@ abstract class Expression extends TreeNode[Expression] {

override def simpleString: String = toString

override def toString: String = prettyName + Utils.truncatedString(
override def toString: String = prettyName + truncatedString(
flatArguments.toSeq, "(", ", ", ")")

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression
import org.apache.spark.sql.catalyst.plans._
import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, Partitioning, RangePartitioning, RoundRobinPartitioning}
import org.apache.spark.sql.catalyst.util.truncatedString
import org.apache.spark.sql.types._
import org.apache.spark.util.Utils
import org.apache.spark.util.random.RandomSampler

/**
Expand Down Expand Up @@ -485,7 +485,7 @@ case class With(child: LogicalPlan, cteRelations: Seq[(String, SubqueryAlias)])
override def output: Seq[Attribute] = child.output

override def simpleString: String = {
val cteAliases = Utils.truncatedString(cteRelations.map(_._1), "[", ", ", "]")
val cteAliases = truncatedString(cteRelations.map(_._1), "[", ", ", "]")
s"CTE $cteAliases"
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,9 @@ import org.apache.spark.sql.catalyst.errors._
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.JoinType
import org.apache.spark.sql.catalyst.plans.physical.{BroadcastMode, Partitioning}
import org.apache.spark.sql.catalyst.util.truncatedString
import org.apache.spark.sql.types._
import org.apache.spark.storage.StorageLevel
import org.apache.spark.util.Utils

/** Used by [[TreeNode.getNodeNumbered]] when traversing the tree for a given number */
private class MutableInt(var i: Int)
Expand Down Expand Up @@ -440,10 +440,10 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]] extends Product {
case tn: TreeNode[_] => tn.simpleString :: Nil
case seq: Seq[Any] if seq.toSet.subsetOf(allChildren.asInstanceOf[Set[Any]]) => Nil
case iter: Iterable[_] if iter.isEmpty => Nil
case seq: Seq[_] => Utils.truncatedString(seq, "[", ", ", "]") :: Nil
case set: Set[_] => Utils.truncatedString(set.toSeq, "{", ", ", "}") :: Nil
case seq: Seq[_] => truncatedString(seq, "[", ", ", "]") :: Nil
case set: Set[_] => truncatedString(set.toSeq, "{", ", ", "}") :: Nil
case array: Array[_] if array.isEmpty => Nil
case array: Array[_] => Utils.truncatedString(array, "[", ", ", "]") :: Nil
case array: Array[_] => truncatedString(array, "[", ", ", "]") :: Nil
case null => Nil
case None => Nil
case Some(null) => Nil
Expand Down Expand Up @@ -664,7 +664,7 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]] extends Product {
t.forall(_.isInstanceOf[Partitioning]) || t.forall(_.isInstanceOf[DataType]) =>
JArray(t.map(parseToJson).toList)
case t: Seq[_] if t.length > 0 && t.head.isInstanceOf[String] =>
JString(Utils.truncatedString(t, "[", ", ", "]"))
JString(truncatedString(t, "[", ", ", "]"))
case t: Seq[_] => JNull
case m: Map[_, _] => JNull
// if it's a scala object, we can simply keep the full class path.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,16 @@ package org.apache.spark.sql.catalyst

import java.io._
import java.nio.charset.StandardCharsets
import java.util.concurrent.atomic.AtomicBoolean

import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.{NumericType, StringType}
import org.apache.spark.unsafe.types.UTF8String
import org.apache.spark.util.Utils

package object util {
package object util extends Logging {

/** Silences output to stderr or stdout for the duration of f */
def quietly[A](f: => A): A = {
Expand Down Expand Up @@ -167,6 +170,38 @@ package object util {
builder.toString()
}

/** Whether we have warned about plan string truncation yet. */
private val truncationWarningPrinted = new AtomicBoolean(false)

/**
* Format a sequence with semantics similar to calling .mkString(). Any elements beyond
* maxNumToStringFields will be dropped and replaced by a "... N more fields" placeholder.
*
* @return the trimmed and formatted string.
*/
def truncatedString[T](
seq: Seq[T],
start: String,
sep: String,
end: String,
maxNumFields: Int = SQLConf.get.maxToStringFields): String = {
if (seq.length > maxNumFields) {
if (truncationWarningPrinted.compareAndSet(false, true)) {
logWarning(
"Truncated the string representation of a plan since it was too large. This " +
s"behavior can be adjusted by setting '${SQLConf.MAX_TO_STRING_FIELDS.key}'.")
}
val numFields = math.max(0, maxNumFields - 1)
seq.take(numFields).mkString(
start, sep, sep + "... " + (seq.length - numFields) + " more fields" + end)
} else {
seq.mkString(start, sep, end)
}
}

/** Shorthand for calling truncatedString() without start or end strings. */
def truncatedString[T](seq: Seq[T], sep: String): String = truncatedString(seq, "", sep, "")

/* FIX ME
implicit class debugLogging(a: Any) {
def debugLogging() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1594,6 +1594,13 @@ object SQLConf {
"WHERE, which does not follow SQL standard.")
.booleanConf
.createWithDefault(false)

val MAX_TO_STRING_FIELDS = buildConf("spark.sql.debug.maxToStringFields")
.doc("Maximum number of fields of sequence-like entries can be converted to strings " +
"in debug output. Any elements beyond the limit will be dropped and replaced by a" +
""" "... N more fields" placeholder.""")
.intConf
.createWithDefault(25)
}

/**
Expand Down Expand Up @@ -2009,6 +2016,8 @@ class SQLConf extends Serializable with Logging {

def integralDivideReturnLong: Boolean = getConf(SQLConf.LEGACY_INTEGRALDIVIDE_RETURN_LONG)

def maxToStringFields: Int = getConf(SQLConf.MAX_TO_STRING_FIELDS)

/** ********************** SQLConf functionality methods ************ */

/** Set Spark SQL configuration properties. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import org.apache.spark.SparkException
import org.apache.spark.annotation.Stable
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, InterpretedOrdering}
import org.apache.spark.sql.catalyst.parser.{CatalystSqlParser, LegacyTypeStringParser}
import org.apache.spark.sql.catalyst.util.quoteIdentifier
import org.apache.spark.sql.catalyst.util.{quoteIdentifier, truncatedString}
import org.apache.spark.util.Utils

/**
Expand Down Expand Up @@ -346,7 +346,7 @@ case class StructType(fields: Array[StructField]) extends DataType with Seq[Stru

override def simpleString: String = {
val fieldTypes = fields.view.map(field => s"${field.name}:${field.dataType.simpleString}")
Utils.truncatedString(fieldTypes, "struct<", ",", ">")
truncatedString(fieldTypes, "struct<", ",", ">")
}

override def catalogString: String = {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.util

import org.apache.spark.SparkFunSuite
import org.apache.spark.sql.catalyst.util.truncatedString

class UtilSuite extends SparkFunSuite {
test("truncatedString") {
assert(truncatedString(Nil, "[", ", ", "]", 2) == "[]")
assert(truncatedString(Seq(1, 2), "[", ", ", "]", 2) == "[1, 2]")
assert(truncatedString(Seq(1, 2, 3), "[", ", ", "]", 2) == "[1, ... 2 more fields]")
assert(truncatedString(Seq(1, 2, 3), "[", ", ", "]", -5) == "[, ... 3 more fields]")
assert(truncatedString(Seq(1, 2, 3), ", ") == "1, 2, 3")
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.codegen.CodegenContext
import org.apache.spark.sql.catalyst.plans.QueryPlan
import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, Partitioning, UnknownPartitioning}
import org.apache.spark.sql.catalyst.util.truncatedString
import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat => ParquetSource}
import org.apache.spark.sql.execution.metric.SQLMetrics
Expand All @@ -56,8 +57,8 @@ trait DataSourceScanExec extends LeafExecNode with CodegenSupport {
case (key, value) =>
key + ": " + StringUtils.abbreviate(redact(value), 100)
}
val metadataStr = Utils.truncatedString(metadataEntries, " ", ", ", "")
s"$nodeNamePrefix$nodeName${Utils.truncatedString(output, "[", ",", "]")}$metadataStr"
val metadataStr = truncatedString(metadataEntries, " ", ", ", "")
s"$nodeNamePrefix$nodeName${truncatedString(output, "[", ",", "]")}$metadataStr"
}

override def verboseString: String = redact(super.verboseString)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,9 @@ import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.plans.physical.{Partitioning, UnknownPartitioning}
import org.apache.spark.sql.catalyst.util.truncatedString
import org.apache.spark.sql.execution.metric.SQLMetrics
import org.apache.spark.sql.types.DataType
import org.apache.spark.util.Utils

object RDDConversions {
def productToRowRdd[A <: Product](data: RDD[A], outputTypes: Seq[DataType]): RDD[InternalRow] = {
Expand Down Expand Up @@ -197,6 +197,6 @@ case class RDDScanExec(
}

override def simpleString: String = {
s"$nodeName${Utils.truncatedString(output, "[", ",", "]")}"
s"$nodeName${truncatedString(output, "[", ",", "]")}"
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, ReturnAnswer}
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.sql.catalyst.util.truncatedString
import org.apache.spark.sql.execution.command.{DescribeTableCommand, ExecutedCommandExec, ShowTablesCommand}
import org.apache.spark.sql.execution.exchange.{EnsureRequirements, ReuseExchange}
import org.apache.spark.sql.types.{BinaryType, DateType, DecimalType, TimestampType, _}
Expand Down Expand Up @@ -206,7 +207,7 @@ class QueryExecution(val sparkSession: SparkSession, val logical: LogicalPlan) {
writer.write("== Parsed Logical Plan ==\n")
writeOrError(writer)(logical.treeString(_, verbose, addSuffix))
writer.write("\n== Analyzed Logical Plan ==\n")
val analyzedOutput = stringOrError(Utils.truncatedString(
val analyzedOutput = stringOrError(truncatedString(
analyzed.output.map(o => s"${o.name}: ${o.dataType.simpleString}"), ", "))
writer.write(analyzedOutput)
writer.write("\n")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import org.apache.spark.sql.catalyst.expressions.aggregate._
import org.apache.spark.sql.catalyst.expressions.codegen._
import org.apache.spark.sql.catalyst.expressions.codegen.Block._
import org.apache.spark.sql.catalyst.plans.physical._
import org.apache.spark.sql.catalyst.util.truncatedString
import org.apache.spark.sql.execution._
import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics}
import org.apache.spark.sql.execution.vectorized.MutableColumnarRow
Expand Down Expand Up @@ -930,9 +931,9 @@ case class HashAggregateExec(

testFallbackStartsAt match {
case None =>
val keyString = Utils.truncatedString(groupingExpressions, "[", ", ", "]")
val functionString = Utils.truncatedString(allAggregateExpressions, "[", ", ", "]")
val outputString = Utils.truncatedString(output, "[", ", ", "]")
val keyString = truncatedString(groupingExpressions, "[", ", ", "]")
val functionString = truncatedString(allAggregateExpressions, "[", ", ", "]")
val outputString = truncatedString(output, "[", ", ", "]")
if (verbose) {
s"HashAggregate(keys=$keyString, functions=$functionString, output=$outputString)"
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,9 @@ import org.apache.spark.sql.catalyst.errors._
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.aggregate._
import org.apache.spark.sql.catalyst.plans.physical._
import org.apache.spark.sql.catalyst.util.truncatedString
import org.apache.spark.sql.execution._
import org.apache.spark.sql.execution.metric.SQLMetrics
import org.apache.spark.util.Utils

/**
* A hash-based aggregate operator that supports [[TypedImperativeAggregate]] functions that may
Expand Down Expand Up @@ -143,9 +143,9 @@ case class ObjectHashAggregateExec(

private def toString(verbose: Boolean): String = {
val allAggregateExpressions = aggregateExpressions
val keyString = Utils.truncatedString(groupingExpressions, "[", ", ", "]")
val functionString = Utils.truncatedString(allAggregateExpressions, "[", ", ", "]")
val outputString = Utils.truncatedString(output, "[", ", ", "]")
val keyString = truncatedString(groupingExpressions, "[", ", ", "]")
val functionString = truncatedString(allAggregateExpressions, "[", ", ", "]")
val outputString = truncatedString(output, "[", ", ", "]")
if (verbose) {
s"ObjectHashAggregate(keys=$keyString, functions=$functionString, output=$outputString)"
} else {
Expand Down
Loading