Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ abstract class GenericStrategy[PhysicalPlan <: TreeNode[PhysicalPlan]] extends L
*
* @tparam PhysicalPlan The type of physical plan produced by this [[QueryPlanner]]
*/
abstract class QueryPlanner[PhysicalPlan <: TreeNode[PhysicalPlan]] {
trait QueryPlanner[PhysicalPlan <: TreeNode[PhysicalPlan]] {
/** A list of execution strategies that can be used by the planner */
def strategies: Seq[GenericStrategy[PhysicalPlan]]

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ private[sql] object DataFrame {
@Experimental
class DataFrame private[sql](
@transient val sqlContext: SQLContext,
@DeveloperApi @transient val queryExecution: SQLContext#QueryExecution)
@DeveloperApi @transient val queryExecution: QueryExecution)
extends RDDApi[Row] with Serializable {

/**
Expand Down
234 changes: 95 additions & 139 deletions sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -166,9 +166,9 @@ class SQLContext(@transient val sparkContext: SparkContext)

protected[sql] def parseSql(sql: String): LogicalPlan = ddlParser.parse(sql, false)

protected[sql] def executeSql(sql: String): this.QueryExecution = executePlan(parseSql(sql))
protected[sql] def executeSql(sql: String): QueryExecution = executePlan(parseSql(sql))

protected[sql] def executePlan(plan: LogicalPlan) = new this.QueryExecution(plan)
protected[sql] def executePlan(plan: LogicalPlan) = new QueryExecution(this, plan)

@transient
protected[sql] val tlSession = new ThreadLocal[SQLSession]() {
Expand Down Expand Up @@ -784,90 +784,12 @@ class SQLContext(@transient val sparkContext: SparkContext)
}.toArray
}

protected[sql] class SparkPlanner extends SparkStrategies {
val sparkContext: SparkContext = self.sparkContext

val sqlContext: SQLContext = self

def codegenEnabled: Boolean = self.conf.codegenEnabled

def unsafeEnabled: Boolean = self.conf.unsafeEnabled

def numPartitions: Int = self.conf.numShufflePartitions

def strategies: Seq[Strategy] =
experimental.extraStrategies ++ (
DataSourceStrategy ::
DDLStrategy ::
TakeOrdered ::
HashAggregation ::
LeftSemiJoin ::
HashJoin ::
InMemoryScans ::
ParquetOperations ::
BasicOperators ::
CartesianProduct ::
BroadcastNestedLoopJoin :: Nil)

/**
* Used to build table scan operators where complex projection and filtering are done using
* separate physical operators. This function returns the given scan operator with Project and
* Filter nodes added only when needed. For example, a Project operator is only used when the
* final desired output requires complex expressions to be evaluated or when columns can be
* further eliminated out after filtering has been done.
*
* The `prunePushedDownFilters` parameter is used to remove those filters that can be optimized
* away by the filter pushdown optimization.
*
* The required attributes for both filtering and expression evaluation are passed to the
* provided `scanBuilder` function so that it can avoid unnecessary column materialization.
*/
def pruneFilterProject(
projectList: Seq[NamedExpression],
filterPredicates: Seq[Expression],
prunePushedDownFilters: Seq[Expression] => Seq[Expression],
scanBuilder: Seq[Attribute] => SparkPlan): SparkPlan = {

val projectSet = AttributeSet(projectList.flatMap(_.references))
val filterSet = AttributeSet(filterPredicates.flatMap(_.references))
val filterCondition =
prunePushedDownFilters(filterPredicates).reduceLeftOption(catalyst.expressions.And)

// Right now we still use a projection even if the only evaluation is applying an alias
// to a column. Since this is a no-op, it could be avoided. However, using this
// optimization with the current implementation would change the output schema.
// TODO: Decouple final output schema from expression evaluation so this copy can be
// avoided safely.

if (AttributeSet(projectList.map(_.toAttribute)) == projectSet &&
filterSet.subsetOf(projectSet)) {
// When it is possible to just use column pruning to get the right projection and
// when the columns of this projection are enough to evaluate all filter conditions,
// just do a scan followed by a filter, with no extra project.
val scan = scanBuilder(projectList.asInstanceOf[Seq[Attribute]])
filterCondition.map(Filter(_, scan)).getOrElse(scan)
} else {
val scan = scanBuilder((projectSet ++ filterSet).toSeq)
Project(projectList, filterCondition.map(Filter(_, scan)).getOrElse(scan))
}
}
}

@transient
protected[sql] val planner = new SparkPlanner
protected[sql] val planner = new SparkPlanner(this)

@transient
protected[sql] lazy val emptyResult = sparkContext.parallelize(Seq.empty[Row], 1)

/**
* Prepares a planned SparkPlan for execution by inserting shuffle operations as needed.
*/
@transient
protected[sql] val prepareForExecution = new RuleExecutor[SparkPlan] {
val batches =
Batch("Add exchange", Once, EnsureRequirements(self)) :: Nil
}

protected[sql] def openSession(): SQLSession = {
detachSession()
val session = createSession()
Expand All @@ -893,64 +815,6 @@ class SQLContext(@transient val sparkContext: SparkContext)
protected[sql] lazy val conf: SQLConf = new SQLConf
}

/**
* :: DeveloperApi ::
* The primary workflow for executing relational queries using Spark. Designed to allow easy
* access to the intermediate phases of query execution for developers.
*/
@DeveloperApi
protected[sql] class QueryExecution(val logical: LogicalPlan) {
def assertAnalyzed(): Unit = analyzer.checkAnalysis(analyzed)

lazy val analyzed: LogicalPlan = analyzer.execute(logical)
lazy val withCachedData: LogicalPlan = {
assertAnalyzed()
cacheManager.useCachedData(analyzed)
}
lazy val optimizedPlan: LogicalPlan = optimizer.execute(withCachedData)

// TODO: Don't just pick the first one...
lazy val sparkPlan: SparkPlan = {
SparkPlan.currentContext.set(self)
planner.plan(optimizedPlan).next()
}
// executedPlan should not be used to initialize any SparkPlan. It should be
// only used for execution.
lazy val executedPlan: SparkPlan = prepareForExecution.execute(sparkPlan)

/** Internal version of the RDD. Avoids copies and has no schema */
lazy val toRdd: RDD[Row] = executedPlan.execute()

protected def stringOrError[A](f: => A): String =
try f.toString catch { case e: Throwable => e.toString }

def simpleString: String =
s"""== Physical Plan ==
|${stringOrError(executedPlan)}
""".stripMargin.trim

override def toString: String = {
def output =
analyzed.output.map(o => s"${o.name}: ${o.dataType.simpleString}").mkString(", ")

// TODO previously will output RDD details by run (${stringOrError(toRdd.toDebugString)})
// however, the `toRdd` will cause the real execution, which is not what we want.
// We need to think about how to avoid the side effect.
s"""== Parsed Logical Plan ==
|${stringOrError(logical)}
|== Analyzed Logical Plan ==
|${stringOrError(output)}
|${stringOrError(analyzed)}
|== Optimized Logical Plan ==
|${stringOrError(optimizedPlan)}
|== Physical Plan ==
|${stringOrError(executedPlan)}
|Code Generation: ${stringOrError(executedPlan.codegenEnabled)}
|== RDD ==
""".stripMargin.trim
}
}

/**
* Parses the data type in our internal string representation. The data type string should
* have the same format as the one generated by `toString` in scala.
Expand Down Expand Up @@ -1321,3 +1185,95 @@ object SQLContext {
}
}
}

/**
* :: DeveloperApi ::
* The primary workflow for executing relational queries using Spark. Designed to allow easy
* access to the intermediate phases of query execution for developers.
*/
@DeveloperApi
protected[sql] class QueryExecution(val sqlContext: SQLContext, val logical: LogicalPlan) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't this be private[sql] now?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is protected[sql] in master.

BTW, this PR is geared towards making it easier for third parties (and HiveContext) to add new processing rules without requiring to subclass (see PR #5556 and SPARK-6320)

I would actually advise making these classes public (or at least protected, without the [sql] qualifier)

def assertAnalyzed(): Unit = sqlContext.analyzer.checkAnalysis(analyzed)

lazy val analyzed: LogicalPlan = sqlContext.analyzer.execute(logical)
lazy val withCachedData: LogicalPlan = {
assertAnalyzed()
sqlContext.cacheManager.useCachedData(analyzed)
}
lazy val optimizedPlan: LogicalPlan = sqlContext.optimizer.execute(withCachedData)

// TODO: Don't just pick the first one...
lazy val sparkPlan: SparkPlan = {
SparkPlan.currentContext.set(sqlContext)
sqlContext.planner.plan(optimizedPlan).next()
}
// executedPlan should not be used to initialize any SparkPlan. It should be
// only used for execution.
lazy val executedPlan: SparkPlan = prepareForExecution.execute(sparkPlan)

/**
* Prepares a planned SparkPlan for execution by inserting shuffle operations as needed.
*/
@transient
protected[sql] val prepareForExecution = new RuleExecutor[SparkPlan] {
val batches =
Batch("Add exchange", Once, EnsureRequirements(sqlContext)) :: Nil
}

/** Internal version of the RDD. Avoids copies and has no schema */
lazy val toRdd: RDD[Row] = executedPlan.execute()

protected def stringOrError[A](f: => A): String =
try f.toString catch { case e: Throwable => e.toString }

def simpleString: String =
s"""== Physical Plan ==
|${stringOrError(executedPlan)}
""".stripMargin.trim

override def toString: String = {
def output =
analyzed.output.map(o => s"${o.name}: ${o.dataType.simpleString}").mkString(", ")

// TODO previously will output RDD details by run (${stringOrError(toRdd.toDebugString)})
// however, the `toRdd` will cause the real execution, which is not what we want.
// We need to think about how to avoid the side effect.
s"""== Parsed Logical Plan ==
|${stringOrError(logical)}
|== Analyzed Logical Plan ==
|${stringOrError(output)}
|${stringOrError(analyzed)}
|== Optimized Logical Plan ==
|${stringOrError(optimizedPlan)}
|== Physical Plan ==
|${stringOrError(executedPlan)}
|Code Generation: ${stringOrError(executedPlan.codegenEnabled)}
|== RDD ==
""".stripMargin.trim
}
}


protected[sql] class SparkPlanner(val sqlContext: SQLContext) extends SparkStrategies {
val sparkContext: SparkContext = sqlContext.sparkContext

def codegenEnabled: Boolean = sqlContext.conf.codegenEnabled

def unsafeEnabled: Boolean = sqlContext.conf.unsafeEnabled

def numPartitions: Int = sqlContext.conf.numShufflePartitions

def strategies: Seq[Strategy] =
sqlContext.experimental.extraStrategies ++ (
DataSourceStrategy ::
DDLStrategy ::
TakeOrdered ::
HashAggregation ::
LeftSemiJoin ::
HashJoin ::
InMemoryScans ::
ParquetOperations ::
BasicOperators ::
CartesianProduct ::
BroadcastNestedLoopJoin :: Nil)
}
Loading