Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
update minor code
  • Loading branch information
lianhuiwang committed May 10, 2016
commit 25b00d73325f97e1a4ff5a3795b068387a80a53f
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,11 @@ import scala.collection.mutable.ArrayBuffer
import org.apache.spark.{MapOutputStatistics, SimpleFutureAction, ShuffleDependency}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.plans.Inner
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.execution.{SortExec, SparkPlan}
import org.apache.spark.sql.execution.aggregate.TungstenAggregate
import org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec, ShuffleExchange}
import org.apache.spark.sql.execution.joins._
import org.apache.spark.sql.types.LongType

/**
* A physical plan tree is divided into a DAG tree of QueryFragment.
Expand Down Expand Up @@ -136,6 +134,7 @@ trait QueryFragment extends SparkPlan {

case agg @ TungstenAggregate(_, _, _, _, _, _, input @ FragmentInput(_))
if (!input.isOptimized())=> {
logInfo("Begin optimize agg, operator =\n" + agg.toString)
optimizeAggregate(agg, input)
}

Expand Down Expand Up @@ -262,6 +261,8 @@ case class RootQueryFragment (

private[this] val stopped = new AtomicBoolean(false)

override def nodeName: String = s"RootQueryFragment (fragment id: ${id})"

protected[sql] override def stageFailed(exception: Throwable): Unit = {
isThrowException = true
this.exception = exception
Expand All @@ -279,6 +280,7 @@ case class RootQueryFragment (
logInfo(s"Query Fragment ${id} finished")
parent.setChildCompleted(child, shuffleDependency, statistics)
if (parent.isAvailable) {
logInfo(s"Query Fragment ${parent.id} is available")
eventQueue.add(parent)
}
case scala.util.Failure(exception) =>
Expand Down Expand Up @@ -327,7 +329,10 @@ case class RootQueryFragment (
assert(this.exception != null)
throw exception
} else {
rootPlan.execute()
logInfo(s"== Submit Query Fragment ${id} Physical plan ==")
val executedPlan = sqlContext.sparkSession.sessionState.codegenForExecution(rootPlan)
logInfo(stringOrError(executedPlan.toString))
executedPlan.execute()
}
}

Expand All @@ -342,4 +347,7 @@ case class RootQueryFragment (
case class UnaryQueryFragment (
children: Seq[QueryFragment],
id: Long,
isRoot: Boolean = false) extends QueryFragment {}
isRoot: Boolean = false) extends QueryFragment {

override def nodeName: String = s"UnaryQueryFragment (fragment id: ${id})"
}
Original file line number Diff line number Diff line change
Expand Up @@ -168,12 +168,11 @@ private[sql] class SessionState(sparkSession: SparkSession) {
def executePlan(plan: LogicalPlan): QueryExecution = new QueryExecution(sparkSession, plan)

def codegenForExecution(plan: SparkPlan): SparkPlan = {
val codegenRules: Seq[Rule[SparkPlan]] =
Seq(CollapseCodegenStages(sparkSession.sessionState.conf))
codegenRules.foldLeft(plan) { case (sp, rule) => rule.apply(sp) }
}

protected def codegenRules: Seq[Rule[SparkPlan]] =
Seq(CollapseCodegenStages(sparkSession.sessionState.conf))

def refreshTable(tableName: String): Unit = {
catalog.refreshTable(sqlParser.parseTableIdentifier(tableName))
}
Expand Down