Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 19 additions & 9 deletions core/src/main/scala/org/apache/spark/util/Utils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1266,16 +1266,18 @@ private[spark] object Utils extends Logging {
}

/** Default filtering function for finding call sites using `getCallSite`. */
private def coreExclusionFunction(className: String): Boolean = {
// A regular expression to match classes of the "core" Spark API that we want to skip when
// finding the call site of a method.
private def sparkInternalExclusionFunction(className: String): Boolean = {
// A regular expression to match classes of the internal Spark API's
// that we want to skip when finding the call site of a method.
val SPARK_CORE_CLASS_REGEX =
"""^org\.apache\.spark(\.api\.java)?(\.util)?(\.rdd)?(\.broadcast)?\.[A-Z]""".r
val SPARK_SQL_CLASS_REGEX = """^org\.apache\.spark\.sql.*""".r
val SCALA_CORE_CLASS_PREFIX = "scala"
val isSparkCoreClass = SPARK_CORE_CLASS_REGEX.findFirstIn(className).isDefined
val isSparkClass = SPARK_CORE_CLASS_REGEX.findFirstIn(className).isDefined ||
SPARK_SQL_CLASS_REGEX.findFirstIn(className).isDefined
val isScalaClass = className.startsWith(SCALA_CORE_CLASS_PREFIX)
// If the class is a Spark internal class or a Scala class, then exclude.
isSparkCoreClass || isScalaClass
isSparkClass || isScalaClass
}

/**
Expand All @@ -1285,7 +1287,7 @@ private[spark] object Utils extends Logging {
*
* @param skipClass Function that is used to exclude non-user-code classes.
*/
def getCallSite(skipClass: String => Boolean = coreExclusionFunction): CallSite = {
def getCallSite(skipClass: String => Boolean = sparkInternalExclusionFunction): CallSite = {
// Keep crawling up the stack trace until we find the first function not inside of the spark
// package. We track the last (shallowest) contiguous Spark method. This might be an RDD
// transformation, a SparkContext function (such as parallelize), or anything else that leads
Expand Down Expand Up @@ -1324,9 +1326,17 @@ private[spark] object Utils extends Logging {
}

val callStackDepth = System.getProperty("spark.callstack.depth", "20").toInt
CallSite(
shortForm = s"$lastSparkMethod at $firstUserFile:$firstUserLine",
longForm = callStack.take(callStackDepth).mkString("\n"))
val shortForm =
if (firstUserFile == "HiveSessionImpl.java") {
// To be more user friendly, show a nicer string for queries submitted from the JDBC
// server.
"Spark JDBC Server Query"
} else {
s"$lastSparkMethod at $firstUserFile:$firstUserLine"
}
val longForm = callStack.take(callStackDepth).mkString("\n")

CallSite(shortForm, longForm)
}

/** Return a string containing part of a file from byte 'start' to 'end'. */
Expand Down
3 changes: 2 additions & 1 deletion python/pyspark/sql/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,8 @@ def explain(self, extended=False):
:param extended: boolean, default ``False``. If ``False``, prints only the physical plan.

>>> df.explain()
PhysicalRDD [age#0,name#1], MapPartitionsRDD[...] at mapPartitions at SQLContext.scala:...
PhysicalRDD [age#0,name#1], MapPartitionsRDD[...] at applySchemaToPythonRDD at\
NativeMethodAccessorImpl.java:...

>>> df.explain(True)
== Parsed Logical Plan ==
Expand Down