Skip to content

Commit ec078ea

Browse files
pwendellrxin
authored andcommitted
[SPARK-7204] [SQL] Fix callSite for Dataframe and SQL operations
This patch adds SQL to the set of excluded libraries when generating a callSite. This makes the callSite mechanism work properly for the data frame API. I also added a small improvement for JDBC queries where we just use the string "Spark JDBC Server Query" instead of trying to give a callsite that doesn't make any sense to the user. Before (DF): ![screen shot 2015-04-28 at 1 29 26 pm](https://cloud.githubusercontent.com/assets/320616/7380170/ef63bfb0-edae-11e4-989c-f88a5ba6bbee.png) After (DF): ![screen shot 2015-04-28 at 1 34 58 pm](https://cloud.githubusercontent.com/assets/320616/7380181/fa7f6d90-edae-11e4-9559-26f163ed63b8.png) After (JDBC): ![screen shot 2015-04-28 at 2 00 10 pm](https://cloud.githubusercontent.com/assets/320616/7380185/02f5b2a4-edaf-11e4-8e5b-99bdc3df66dd.png) Author: Patrick Wendell <patrick@databricks.com> Closes #5757 from pwendell/dataframes and squashes the following commits: 0d931a4 [Patrick Wendell] Attempting to fix PySpark tests 85bf740 [Patrick Wendell] [SPARK-7204] Fix callsite for dataframe operations. (cherry picked from commit 1fd6ed9) Signed-off-by: Reynold Xin <rxin@databricks.com>
1 parent 7e21026 commit ec078ea

File tree

2 files changed

+21
-10
lines changed

2 files changed

+21
-10
lines changed

core/src/main/scala/org/apache/spark/util/Utils.scala

Lines changed: 19 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1158,16 +1158,18 @@ private[spark] object Utils extends Logging {
11581158
}
11591159

11601160
/** Default filtering function for finding call sites using `getCallSite`. */
1161-
private def coreExclusionFunction(className: String): Boolean = {
1162-
// A regular expression to match classes of the "core" Spark API that we want to skip when
1163-
// finding the call site of a method.
1161+
private def sparkInternalExclusionFunction(className: String): Boolean = {
1162+
// A regular expression to match classes of the internal Spark API's
1163+
// that we want to skip when finding the call site of a method.
11641164
val SPARK_CORE_CLASS_REGEX =
11651165
"""^org\.apache\.spark(\.api\.java)?(\.util)?(\.rdd)?(\.broadcast)?\.[A-Z]""".r
1166+
val SPARK_SQL_CLASS_REGEX = """^org\.apache\.spark\.sql.*""".r
11661167
val SCALA_CORE_CLASS_PREFIX = "scala"
1167-
val isSparkCoreClass = SPARK_CORE_CLASS_REGEX.findFirstIn(className).isDefined
1168+
val isSparkClass = SPARK_CORE_CLASS_REGEX.findFirstIn(className).isDefined ||
1169+
SPARK_SQL_CLASS_REGEX.findFirstIn(className).isDefined
11681170
val isScalaClass = className.startsWith(SCALA_CORE_CLASS_PREFIX)
11691171
// If the class is a Spark internal class or a Scala class, then exclude.
1170-
isSparkCoreClass || isScalaClass
1172+
isSparkClass || isScalaClass
11711173
}
11721174

11731175
/**
@@ -1177,7 +1179,7 @@ private[spark] object Utils extends Logging {
11771179
*
11781180
* @param skipClass Function that is used to exclude non-user-code classes.
11791181
*/
1180-
def getCallSite(skipClass: String => Boolean = coreExclusionFunction): CallSite = {
1182+
def getCallSite(skipClass: String => Boolean = sparkInternalExclusionFunction): CallSite = {
11811183
// Keep crawling up the stack trace until we find the first function not inside of the spark
11821184
// package. We track the last (shallowest) contiguous Spark method. This might be an RDD
11831185
// transformation, a SparkContext function (such as parallelize), or anything else that leads
@@ -1216,9 +1218,17 @@ private[spark] object Utils extends Logging {
12161218
}
12171219

12181220
val callStackDepth = System.getProperty("spark.callstack.depth", "20").toInt
1219-
CallSite(
1220-
shortForm = s"$lastSparkMethod at $firstUserFile:$firstUserLine",
1221-
longForm = callStack.take(callStackDepth).mkString("\n"))
1221+
val shortForm =
1222+
if (firstUserFile == "HiveSessionImpl.java") {
1223+
// To be more user friendly, show a nicer string for queries submitted from the JDBC
1224+
// server.
1225+
"Spark JDBC Server Query"
1226+
} else {
1227+
s"$lastSparkMethod at $firstUserFile:$firstUserLine"
1228+
}
1229+
val longForm = callStack.take(callStackDepth).mkString("\n")
1230+
1231+
CallSite(shortForm, longForm)
12221232
}
12231233

12241234
/** Return a string containing part of a file from byte 'start' to 'end'. */

python/pyspark/sql/dataframe.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -236,7 +236,8 @@ def explain(self, extended=False):
236236
:param extended: boolean, default ``False``. If ``False``, prints only the physical plan.
237237
238238
>>> df.explain()
239-
PhysicalRDD [age#0,name#1], MapPartitionsRDD[...] at mapPartitions at SQLContext.scala:...
239+
PhysicalRDD [age#0,name#1], MapPartitionsRDD[...] at applySchemaToPythonRDD at\
240+
NativeMethodAccessorImpl.java:...
240241
241242
>>> df.explain(True)
242243
== Parsed Logical Plan ==

0 commit comments

Comments
 (0)