Skip to content
Closed
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Removed outdated changes to QueryExecution
  • Loading branch information
DaveDeCaprio committed Nov 18, 2018
commit 0681f059648ef3f7f00a488b947d2d7185364614
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,13 @@

package org.apache.spark.sql.execution

import java.io.{BufferedWriter, OutputStreamWriter, Writer}
import java.nio.charset.StandardCharsets
import java.sql.{Date, Timestamp}

import org.apache.commons.io.output.StringBuilderWriter
import org.apache.hadoop.fs.Path

import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{AnalysisException, Row, SparkSession}
import org.apache.spark.sql.catalyst.InternalRow
Expand Down Expand Up @@ -189,24 +193,38 @@ class QueryExecution(val sparkSession: SparkSession, val logical: LogicalPlan) {
""".stripMargin.trim
}

private def writeOrError(writer: Writer)(f: Writer => Unit): Unit = {
try f(writer)
catch {
case e: AnalysisException => writer.write(e.toString)
}
}

private def writePlans(writer: Writer): Unit = {
val (verbose, addSuffix) = (true, false)

writer.write("== Parsed Logical Plan ==\n")
writeOrError(writer)(logical.treeString(_, verbose, addSuffix))
writer.write("\n== Analyzed Logical Plan ==\n")
val analyzedOutput = stringOrError(Utils.truncatedString(
analyzed.output.map(o => s"${o.name}: ${o.dataType.simpleString}"), ", "))
writer.write(analyzedOutput)
writer.write("\n")
writeOrError(writer)(analyzed.treeString(_, verbose, addSuffix))
writer.write("\n== Optimized Logical Plan ==\n")
writeOrError(writer)(optimizedPlan.treeString(_, verbose, addSuffix))
writer.write("\n== Physical Plan ==\n")
writeOrError(writer)(executedPlan.treeString(_, verbose, addSuffix))
}

override def toString: String = withRedaction {
def output = Utils.truncatedString(
analyzed.output.map(o => s"${o.name}: ${o.dataType.simpleString}"), ", ")
val analyzedPlan = Seq(
stringOrError(output),
stringOrError(analyzed.treeString(verbose = true))
).filter(_.nonEmpty).mkString("\n")

val builder = new StringBuilder
builder.append("== Parsed Logical Plan ==\n")
builder.append(stringOrError(logical.treeString(verbose = true)))
builder.append("== Analyzed Logical Plan ==\n")
builder.append(analyzedPlan)
builder.append("== Optimized Logical Plan ==\n")
builder.append(stringOrError(optimizedPlan.treeString(verbose = true)))
builder.append("== Physical Plan ==\n")
builder.append(stringOrError(executedPlan.treeString(verbose = true)))
builder.toString
val writer = new StringBuilderWriter()
try {
writePlans(writer)
writer.toString
} finally {
writer.close()
}
}

def stringWithStats: String = withRedaction {
Expand Down Expand Up @@ -251,5 +269,22 @@ class QueryExecution(val sparkSession: SparkSession, val logical: LogicalPlan) {
def codegenToSeq(): Seq[(String, String)] = {
org.apache.spark.sql.execution.debug.codegenStringSeq(executedPlan)
}

/**
* Dumps debug information about query execution into the specified file.
*/
def toFile(path: String): Unit = {
val filePath = new Path(path)
val fs = filePath.getFileSystem(sparkSession.sessionState.newHadoopConf())
val writer = new BufferedWriter(new OutputStreamWriter(fs.create(filePath)))

try {
writePlans(writer)
writer.write("\n== Whole Stage Codegen ==\n")
org.apache.spark.sql.execution.debug.writeCodegen(writer, executedPlan)
} finally {
writer.close()
}
}
}
}