From 83668c154c15c631a80a17bc8f96a911c8fccff4 Mon Sep 17 00:00:00 2001 From: Edoardo Vacchi Date: Wed, 13 May 2015 10:57:47 +0200 Subject: [PATCH 1/8] Re-defines abstract class QueryPlanner as trait --- .../org/apache/spark/sql/catalyst/planning/QueryPlanner.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/QueryPlanner.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/QueryPlanner.scala index 51b5699affed..f4a7ff3a2975 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/QueryPlanner.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/QueryPlanner.scala @@ -42,7 +42,7 @@ abstract class GenericStrategy[PhysicalPlan <: TreeNode[PhysicalPlan]] extends L * * @tparam PhysicalPlan The type of physical plan produced by this [[QueryPlanner]] */ -abstract class QueryPlanner[PhysicalPlan <: TreeNode[PhysicalPlan]] { +trait QueryPlanner[PhysicalPlan <: TreeNode[PhysicalPlan]] { /** A list of execution strategies that can be used by the planner */ def strategies: Seq[GenericStrategy[PhysicalPlan]] From 45d6c9d340ee4726356301efdfaf929ed25b1f2b Mon Sep 17 00:00:00 2001 From: Edoardo Vacchi Date: Wed, 13 May 2015 10:26:25 +0200 Subject: [PATCH 2/8] Make abstract class SparkStrategies a trait; remove self-type declaration --- .../spark/sql/execution/SparkStrategies.scala | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala index af0029cb84f9..2ab290600f9d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala @@ -17,6 +17,7 @@ package org.apache.spark.sql.execution +import org.apache.spark.SparkContext import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.planning._ import org.apache.spark.sql.catalyst.plans._ @@ -29,8 +30,12 @@ import org.apache.spark.sql.sources.{CreateTableUsing, CreateTempTableUsing, Des import org.apache.spark.sql.types._ import org.apache.spark.sql.{SQLContext, Strategy, execution} -private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] { - self: SQLContext#SparkPlanner => +private[sql] trait SparkStrategies extends QueryPlanner[SparkPlan] { + def sqlContext: SQLContext + def sparkContext: SparkContext + def codeGenEnabled: Boolean + def unsafeEnabled: Boolean + def numPartitions: Int object LeftSemiJoin extends Strategy with PredicateHelper { def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { @@ -277,7 +282,7 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] { // Can we automate these 'pass through' operations? object BasicOperators extends Strategy { - def numPartitions: Int = self.numPartitions + def numPartitions: Int = sqlContext.conf.numPartitions def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { case r: RunnableCommand => ExecutedCommand(r) :: Nil @@ -355,7 +360,7 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] { sys.error("Tables created with SQLContext must be TEMPORARY. Use a HiveContext instead.") case LogicalDescribeCommand(table, isExtended) => - val resultPlan = self.sqlContext.executePlan(table).executedPlan + val resultPlan = sqlContext.executePlan(table).executedPlan ExecutedCommand( RunnableDescribeCommand(resultPlan, resultPlan.output, isExtended)) :: Nil From f6222676157f4ca137e8b24dfa7c20d40de70442 Mon Sep 17 00:00:00 2001 From: Edoardo Vacchi Date: Wed, 13 May 2015 10:27:04 +0200 Subject: [PATCH 3/8] Remove 'this' from 'QueryExecution' type declaration --- sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala index 70ba8985d634..5c29dc69334a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala @@ -196,9 +196,9 @@ class SQLContext(@transient val sparkContext: SparkContext) protected[sql] def parseSql(sql: String): LogicalPlan = ddlParser.parse(sql, false) - protected[sql] def executeSql(sql: String): this.QueryExecution = executePlan(parseSql(sql)) + protected[sql] def executeSql(sql: String): QueryExecution = executePlan(parseSql(sql)) - protected[sql] def executePlan(plan: LogicalPlan) = new this.QueryExecution(plan) + protected[sql] def executePlan(plan: LogicalPlan) = new QueryExecution(plan) @transient protected[sql] val tlSession = new ThreadLocal[SQLSession]() { From acf7e2026873870951631d7a96dc6f39abd55f29 Mon Sep 17 00:00:00 2001 From: Edoardo Vacchi Date: Wed, 13 May 2015 10:54:50 +0200 Subject: [PATCH 4/8] Refactor out QueryExecution from SQLContext and HiveContext --- .../org/apache/spark/sql/DataFrame.scala | 2 +- .../org/apache/spark/sql/SQLContext.scala | 232 +++++++----------- .../spark/sql/execution/SparkStrategies.scala | 59 ++++- .../thriftserver/AbstractSparkSQLDriver.scala | 2 +- .../apache/spark/sql/hive/HiveContext.scala | 76 +++--- .../spark/sql/hive/HiveStrategies.scala | 11 +- .../apache/spark/sql/hive/test/TestHive.scala | 12 +- 7 files changed, 198 insertions(+), 196 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala index 01fd432cc819..e46f1dad480a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala @@ -119,7 +119,7 @@ private[sql] object DataFrame { @Experimental class DataFrame private[sql]( @transient val sqlContext: SQLContext, - @DeveloperApi @transient val queryExecution: SQLContext#QueryExecution) + @DeveloperApi @transient val queryExecution: QueryExecution) extends RDDApi[Row] with Serializable { /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala index 5c29dc69334a..0ae620dab30c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala @@ -198,7 +198,7 @@ class SQLContext(@transient val sparkContext: SparkContext) protected[sql] def executeSql(sql: String): QueryExecution = executePlan(parseSql(sql)) - protected[sql] def executePlan(plan: LogicalPlan) = new QueryExecution(plan) + protected[sql] def executePlan(plan: LogicalPlan) = new QueryExecution(this, plan) @transient protected[sql] val tlSession = new ThreadLocal[SQLSession]() { @@ -1143,90 +1143,13 @@ class SQLContext(@transient val sparkContext: SparkContext) }.toArray } - protected[sql] class SparkPlanner extends SparkStrategies { - val sparkContext: SparkContext = self.sparkContext - - val sqlContext: SQLContext = self - - def codegenEnabled: Boolean = self.conf.codegenEnabled - - def unsafeEnabled: Boolean = self.conf.unsafeEnabled - - def numPartitions: Int = self.conf.numShufflePartitions - - def strategies: Seq[Strategy] = - experimental.extraStrategies ++ ( - DataSourceStrategy :: - DDLStrategy :: - TakeOrdered :: - HashAggregation :: - LeftSemiJoin :: - HashJoin :: - InMemoryScans :: - ParquetOperations :: - BasicOperators :: - CartesianProduct :: - BroadcastNestedLoopJoin :: Nil) - - /** - * Used to build table scan operators where complex projection and filtering are done using - * separate physical operators. This function returns the given scan operator with Project and - * Filter nodes added only when needed. For example, a Project operator is only used when the - * final desired output requires complex expressions to be evaluated or when columns can be - * further eliminated out after filtering has been done. - * - * The `prunePushedDownFilters` parameter is used to remove those filters that can be optimized - * away by the filter pushdown optimization. - * - * The required attributes for both filtering and expression evaluation are passed to the - * provided `scanBuilder` function so that it can avoid unnecessary column materialization. - */ - def pruneFilterProject( - projectList: Seq[NamedExpression], - filterPredicates: Seq[Expression], - prunePushedDownFilters: Seq[Expression] => Seq[Expression], - scanBuilder: Seq[Attribute] => SparkPlan): SparkPlan = { - - val projectSet = AttributeSet(projectList.flatMap(_.references)) - val filterSet = AttributeSet(filterPredicates.flatMap(_.references)) - val filterCondition = - prunePushedDownFilters(filterPredicates).reduceLeftOption(expressions.And) - - // Right now we still use a projection even if the only evaluation is applying an alias - // to a column. Since this is a no-op, it could be avoided. However, using this - // optimization with the current implementation would change the output schema. - // TODO: Decouple final output schema from expression evaluation so this copy can be - // avoided safely. - - if (AttributeSet(projectList.map(_.toAttribute)) == projectSet && - filterSet.subsetOf(projectSet)) { - // When it is possible to just use column pruning to get the right projection and - // when the columns of this projection are enough to evaluate all filter conditions, - // just do a scan followed by a filter, with no extra project. - val scan = scanBuilder(projectList.asInstanceOf[Seq[Attribute]]) - filterCondition.map(Filter(_, scan)).getOrElse(scan) - } else { - val scan = scanBuilder((projectSet ++ filterSet).toSeq) - Project(projectList, filterCondition.map(Filter(_, scan)).getOrElse(scan)) - } - } - } @transient - protected[sql] val planner = new SparkPlanner + protected[sql] val planner = new SparkPlanner(this) @transient protected[sql] lazy val emptyResult = sparkContext.parallelize(Seq.empty[Row], 1) - /** - * Prepares a planned SparkPlan for execution by inserting shuffle operations as needed. - */ - @transient - protected[sql] val prepareForExecution = new RuleExecutor[SparkPlan] { - val batches = - Batch("Add exchange", Once, EnsureRequirements(self)) :: Nil - } - protected[sql] def openSession(): SQLSession = { detachSession() val session = createSession() @@ -1252,64 +1175,6 @@ class SQLContext(@transient val sparkContext: SparkContext) protected[sql] lazy val conf: SQLConf = new SQLConf } - /** - * :: DeveloperApi :: - * The primary workflow for executing relational queries using Spark. Designed to allow easy - * access to the intermediate phases of query execution for developers. - */ - @DeveloperApi - protected[sql] class QueryExecution(val logical: LogicalPlan) { - def assertAnalyzed(): Unit = analyzer.checkAnalysis(analyzed) - - lazy val analyzed: LogicalPlan = analyzer.execute(logical) - lazy val withCachedData: LogicalPlan = { - assertAnalyzed() - cacheManager.useCachedData(analyzed) - } - lazy val optimizedPlan: LogicalPlan = optimizer.execute(withCachedData) - - // TODO: Don't just pick the first one... - lazy val sparkPlan: SparkPlan = { - SparkPlan.currentContext.set(self) - planner(optimizedPlan).next() - } - // executedPlan should not be used to initialize any SparkPlan. It should be - // only used for execution. - lazy val executedPlan: SparkPlan = prepareForExecution.execute(sparkPlan) - - /** Internal version of the RDD. Avoids copies and has no schema */ - lazy val toRdd: RDD[Row] = executedPlan.execute() - - protected def stringOrError[A](f: => A): String = - try f.toString catch { case e: Throwable => e.toString } - - def simpleString: String = - s"""== Physical Plan == - |${stringOrError(executedPlan)} - """.stripMargin.trim - - override def toString: String = { - def output = - analyzed.output.map(o => s"${o.name}: ${o.dataType.simpleString}").mkString(", ") - - // TODO previously will output RDD details by run (${stringOrError(toRdd.toDebugString)}) - // however, the `toRdd` will cause the real execution, which is not what we want. - // We need to think about how to avoid the side effect. - s"""== Parsed Logical Plan == - |${stringOrError(logical)} - |== Analyzed Logical Plan == - |${stringOrError(output)} - |${stringOrError(analyzed)} - |== Optimized Logical Plan == - |${stringOrError(optimizedPlan)} - |== Physical Plan == - |${stringOrError(executedPlan)} - |Code Generation: ${stringOrError(executedPlan.codegenEnabled)} - |== RDD == - """.stripMargin.trim - } - } - /** * Parses the data type in our internal string representation. The data type string should * have the same format as the one generated by `toString` in scala. @@ -1379,3 +1244,96 @@ class SQLContext(@transient val sparkContext: SparkContext) } + +/** + * :: DeveloperApi :: + * The primary workflow for executing relational queries using Spark. Designed to allow easy + * access to the intermediate phases of query execution for developers. + */ +@DeveloperApi +protected[sql] class QueryExecution(val sqlContext: SQLContext, val logical: LogicalPlan) { + def assertAnalyzed(): Unit = sqlContext.analyzer.checkAnalysis(analyzed) + + lazy val analyzed: LogicalPlan = sqlContext.analyzer.execute(logical) + lazy val withCachedData: LogicalPlan = { + assertAnalyzed() + sqlContext.cacheManager.useCachedData(analyzed) + } + lazy val optimizedPlan: LogicalPlan = sqlContext.optimizer.execute(withCachedData) + + // TODO: Don't just pick the first one... + lazy val sparkPlan: SparkPlan = { + SparkPlan.currentContext.set(sqlContext) + sqlContext.planner(optimizedPlan).next() + } + // executedPlan should not be used to initialize any SparkPlan. It should be + // only used for execution. + lazy val executedPlan: SparkPlan = prepareForExecution.execute(sparkPlan) + + /** + * Prepares a planned SparkPlan for execution by inserting shuffle operations as needed. + */ + @transient + protected[sql] val prepareForExecution = new RuleExecutor[SparkPlan] { + val batches = + Batch("Add exchange", Once, EnsureRequirements(sqlContext)) :: Nil + } + + /** Internal version of the RDD. Avoids copies and has no schema */ + lazy val toRdd: RDD[Row] = executedPlan.execute() + + protected def stringOrError[A](f: => A): String = + try f.toString catch { case e: Throwable => e.toString } + + def simpleString: String = + s"""== Physical Plan == + |${stringOrError(executedPlan)} + """.stripMargin.trim + + override def toString: String = { + def output = + analyzed.output.map(o => s"${o.name}: ${o.dataType.simpleString}").mkString(", ") + + // TODO previously will output RDD details by run (${stringOrError(toRdd.toDebugString)}) + // however, the `toRdd` will cause the real execution, which is not what we want. + // We need to think about how to avoid the side effect. + s"""== Parsed Logical Plan == + |${stringOrError(logical)} + |== Analyzed Logical Plan == + |${stringOrError(output)} + |${stringOrError(analyzed)} + |== Optimized Logical Plan == + |${stringOrError(optimizedPlan)} + |== Physical Plan == + |${stringOrError(executedPlan)} + |Code Generation: ${stringOrError(executedPlan.codegenEnabled)} + |== RDD == + """.stripMargin.trim + } +} + + +protected[sql] class SparkPlanner(val sqlContext: SQLContext) extends SparkStrategies { + val sparkContext: SparkContext = sqlContext.sparkContext + + def codegenEnabled: Boolean = sqlContext.conf.codegenEnabled + + def unsafeEnabled: Boolean = sqlContext.conf.unsafeEnabled + + def numPartitions: Int = sqlContext.conf.numShufflePartitions + + def strategies: Seq[Strategy] = + sqlContext.experimental.extraStrategies ++ ( + DataSourceStrategy :: + DDLStrategy :: + TakeOrdered :: + HashAggregation :: + LeftSemiJoin :: + HashJoin :: + InMemoryScans :: + ParquetOperations :: + BasicOperators :: + CartesianProduct :: + BroadcastNestedLoopJoin :: Nil) + +} \ No newline at end of file diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala index 2ab290600f9d..e65154fa868f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala @@ -25,18 +25,63 @@ import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.catalyst.plans.physical._ import org.apache.spark.sql.columnar.{InMemoryColumnarTableScan, InMemoryRelation} import org.apache.spark.sql.execution.{DescribeCommand => RunnableDescribeCommand} + import org.apache.spark.sql.parquet._ import org.apache.spark.sql.sources.{CreateTableUsing, CreateTempTableUsing, DescribeCommand => LogicalDescribeCommand, _} import org.apache.spark.sql.types._ -import org.apache.spark.sql.{SQLContext, Strategy, execution} +import org.apache.spark.sql.{catalyst, SQLContext, Strategy, execution} -private[sql] trait SparkStrategies extends QueryPlanner[SparkPlan] { +private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] { def sqlContext: SQLContext def sparkContext: SparkContext - def codeGenEnabled: Boolean + def codegenEnabled: Boolean def unsafeEnabled: Boolean def numPartitions: Int + + /** + * Used to build table scan operators where complex projection and filtering are done using + * separate physical operators. This function returns the given scan operator with Project and + * Filter nodes added only when needed. For example, a Project operator is only used when the + * final desired output requires complex expressions to be evaluated or when columns can be + * further eliminated out after filtering has been done. + * + * The `prunePushedDownFilters` parameter is used to remove those filters that can be optimized + * away by the filter pushdown optimization. + * + * The required attributes for both filtering and expression evaluation are passed to the + * provided `scanBuilder` function so that it can avoid unnecessary column materialization. + */ + def pruneFilterProject( + projectList: Seq[NamedExpression], + filterPredicates: Seq[Expression], + prunePushedDownFilters: Seq[Expression] => Seq[Expression], + scanBuilder: Seq[Attribute] => SparkPlan): SparkPlan = { + + val projectSet = AttributeSet(projectList.flatMap(_.references)) + val filterSet = AttributeSet(filterPredicates.flatMap(_.references)) + val filterCondition = + prunePushedDownFilters(filterPredicates).reduceLeftOption(catalyst.expressions.And) + + // Right now we still use a projection even if the only evaluation is applying an alias + // to a column. Since this is a no-op, it could be avoided. However, using this + // optimization with the current implementation would change the output schema. + // TODO: Decouple final output schema from expression evaluation so this copy can be + // avoided safely. + + if (AttributeSet(projectList.map(_.toAttribute)) == projectSet && + filterSet.subsetOf(projectSet)) { + // When it is possible to just use column pruning to get the right projection and + // when the columns of this projection are enough to evaluate all filter conditions, + // just do a scan followed by a filter, with no extra project. + val scan = scanBuilder(projectList.asInstanceOf[Seq[Attribute]]) + filterCondition.map(Filter(_, scan)).getOrElse(scan) + } else { + val scan = scanBuilder((projectSet ++ filterSet).toSeq) + Project(projectList, filterCondition.map(Filter(_, scan)).getOrElse(scan)) + } + } + object LeftSemiJoin extends Strategy with PredicateHelper { def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { case ExtractEquiJoinKeys(LeftSemi, leftKeys, rightKeys, condition, left, right) @@ -136,7 +181,7 @@ private[sql] trait SparkStrategies extends QueryPlanner[SparkPlan] { if canBeCodeGened( allAggregates(partialComputation) ++ allAggregates(rewrittenAggregateExpressions)) && - codegenEnabled => + sqlContext.planner.codegenEnabled => execution.GeneratedAggregate( partial = false, namedGroupingAttributes, @@ -255,7 +300,7 @@ private[sql] trait SparkStrategies extends QueryPlanner[SparkPlan] { } else { identity[Seq[Expression]] _ } - pruneFilterProject( + sqlContext.planner.pruneFilterProject( projectList, filters, prunePushedDownFilters, @@ -271,7 +316,7 @@ private[sql] trait SparkStrategies extends QueryPlanner[SparkPlan] { object InMemoryScans extends Strategy { def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { case PhysicalOperation(projectList, filters, mem: InMemoryRelation) => - pruneFilterProject( + sqlContext.planner.pruneFilterProject( projectList, filters, identity[Seq[Expression]], // All filters still need to be evaluated. @@ -282,7 +327,7 @@ private[sql] trait SparkStrategies extends QueryPlanner[SparkPlan] { // Can we automate these 'pass through' operations? object BasicOperators extends Strategy { - def numPartitions: Int = sqlContext.conf.numPartitions + def numPartitions: Int = sqlContext.planner.numPartitions def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { case r: RunnableCommand => ExecutedCommand(r) :: Nil diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/AbstractSparkSQLDriver.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/AbstractSparkSQLDriver.scala index 48ac9062af96..d3ae3391c34a 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/AbstractSparkSQLDriver.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/AbstractSparkSQLDriver.scala @@ -36,7 +36,7 @@ private[hive] abstract class AbstractSparkSQLDriver( override def init(): Unit = { } - private def getResultSetSchema(query: context.QueryExecution): Schema = { + private def getResultSetSchema(query: QueryExecution): Schema = { val analyzed = query.analyzed logDebug(s"Result Schema: ${analyzed.output}") if (analyzed.output.size == 0) { diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala index 61e8c154e8c3..400369964707 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala @@ -217,8 +217,8 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) { super.parseSql(substitutor.substitute(hiveconf, sql)) } - override protected[sql] def executePlan(plan: LogicalPlan): this.QueryExecution = - new this.QueryExecution(plan) + override protected[sql] def executePlan(plan: LogicalPlan): HiveQueryExecution = + new HiveQueryExecution(this, plan) /** * Invalidate and refresh all the cached the metadata of the given table. For performance reasons, @@ -385,8 +385,8 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) { } @transient - private val hivePlanner = new SparkPlanner with HiveStrategies { - val hiveContext = self + private val hivePlanner = new SparkPlanner(this) with HiveStrategies { + val hiveContext = sqlContext override def strategies: Seq[Strategy] = experimental.extraStrategies ++ Seq( DataSourceStrategy, @@ -423,43 +423,45 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) { @transient override protected[sql] val planner = hivePlanner - /** Extends QueryExecution with hive specific features. */ - protected[sql] class QueryExecution(logicalPlan: LogicalPlan) - extends super.QueryExecution(logicalPlan) { +} - /** - * Returns the result as a hive compatible sequence of strings. For native commands, the - * execution is simply passed back to Hive. - */ - def stringResult(): Seq[String] = executedPlan match { - case ExecutedCommand(desc: DescribeHiveTableCommand) => - // If it is a describe command for a Hive table, we want to have the output format - // be similar with Hive. - desc.run(self).map { - case Row(name: String, dataType: String, comment) => - Seq(name, dataType, - Option(comment.asInstanceOf[String]).getOrElse("")) - .map(s => String.format(s"%-20s", s)) - .mkString("\t") - } - case command: ExecutedCommand => - command.executeCollect().map(_(0).toString) - - case other => - val result: Seq[Seq[Any]] = other.executeCollect().map(_.toSeq).toSeq - // We need the types so we can output struct field names - val types = analyzed.output.map(_.dataType) - // Reformat to match hive tab delimited output. - result.map(_.zip(types).map(HiveContext.toHiveString)).map(_.mkString("\t")).toSeq - } - override def simpleString: String = - logical match { - case _: HiveNativeCommand => "" - case _: SetCommand => "" - case _ => super.simpleString +/** Extends QueryExecution with hive specific features. */ +protected[sql] class HiveQueryExecution(hiveContext: HiveContext, logicalPlan: LogicalPlan) + extends QueryExecution(hiveContext, logicalPlan) { + + /** + * Returns the result as a hive compatible sequence of strings. For native commands, the + * execution is simply passed back to Hive. + */ + def stringResult(): Seq[String] = executedPlan match { + case ExecutedCommand(desc: DescribeHiveTableCommand) => + // If it is a describe command for a Hive table, we want to have the output format + // be similar with Hive. + desc.run(sqlContext).map { + case Row(name: String, dataType: String, comment) => + Seq(name, dataType, + Option(comment.asInstanceOf[String]).getOrElse("")) + .map(s => String.format(s"%-20s", s)) + .mkString("\t") } + case command: ExecutedCommand => + command.executeCollect().map(_(0).toString) + + case other => + val result: Seq[Seq[Any]] = other.executeCollect().map(_.toSeq).toSeq + // We need the types so we can output struct field names + val types = analyzed.output.map(_.dataType) + // Reformat to match hive tab delimited output. + result.map(_.zip(types).map(HiveContext.toHiveString)).map(_.mkString("\t")).toSeq } + + override def simpleString: String = + logical match { + case _: HiveNativeCommand => "" + case _: SetCommand => "" + case _ => super.simpleString + } } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala index d46a127d47d3..1c864e70d2af 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala @@ -35,11 +35,8 @@ import org.apache.spark.sql.sources.{CreateTableUsing, CreateTableUsingAsSelect, import org.apache.spark.sql.types.StringType -private[hive] trait HiveStrategies { - // Possibly being too clever with types here... or not clever enough. - self: SQLContext#SparkPlanner => - - val hiveContext: HiveContext +private[hive] trait HiveStrategies extends SparkStrategies { + def hiveContext: HiveContext /** * :: Experimental :: @@ -137,7 +134,7 @@ private[hive] trait HiveStrategies { val partitionLocations = partitions.map(_.getLocation) if (partitionLocations.isEmpty) { - PhysicalRDD(plan.output, sparkContext.emptyRDD[Row]) :: Nil + PhysicalRDD(plan.output, hiveContext.sparkContext.emptyRDD[Row]) :: Nil } else { hiveContext .parquetFile(partitionLocations: _*) @@ -165,7 +162,7 @@ private[hive] trait HiveStrategies { // TODO: Remove this hack for Spark 1.3. case iae: java.lang.IllegalArgumentException if iae.getMessage.contains("Can not create a Path from an empty string") => - PhysicalRDD(plan.output, sparkContext.emptyRDD[Row]) :: Nil + PhysicalRDD(plan.output, hiveContext.sparkContext.emptyRDD[Row]) :: Nil } case _ => Nil } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala index 1598d4bd4755..bf362a3d455e 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala @@ -93,8 +93,8 @@ class TestHiveContext(sc: SparkContext) extends HiveContext(sc) { override def runSqlHive(sql: String): Seq[String] = super.runSqlHive(rewritePaths(substitutor.substitute(this.hiveconf, sql))) - override def executePlan(plan: LogicalPlan): this.QueryExecution = - new this.QueryExecution(plan) + override def executePlan(plan: LogicalPlan): TestQueryExecution = + new TestQueryExecution(this, plan) override protected[sql] def createSession(): SQLSession = { new this.SQLSession() @@ -158,9 +158,9 @@ class TestHiveContext(sc: SparkContext) extends HiveContext(sc) { /** * Override QueryExecution with special debug workflow. */ - class QueryExecution(logicalPlan: LogicalPlan) - extends super.QueryExecution(logicalPlan) { - def this(sql: String) = this(parseSql(sql)) + class TestQueryExecution(hiveContext: HiveContext, logicalPlan: LogicalPlan) + extends HiveQueryExecution(hiveContext, logicalPlan) { + def this(sql: String) = this(self, parseSql(sql)) override lazy val analyzed = { val describedTables = logical match { case HiveNativeCommand(describedTable(tbl)) => tbl :: Nil @@ -184,7 +184,7 @@ class TestHiveContext(sc: SparkContext) extends HiveContext(sc) { protected[hive] implicit class SqlCmd(sql: String) { def cmd: () => Unit = { - () => new QueryExecution(sql).stringResult(): Unit + () => new TestQueryExecution(sql).stringResult(): Unit } } From 718ef719a5cb40a41c4d2324e50a74316ec98119 Mon Sep 17 00:00:00 2001 From: Edoardo Vacchi Date: Wed, 13 May 2015 17:59:07 +0200 Subject: [PATCH 5/8] Cast sqlContext to HiveContext in HiveContext.hivePlanner --- .../src/main/scala/org/apache/spark/sql/hive/HiveContext.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala index 400369964707..bf9524f25a85 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala @@ -386,7 +386,7 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) { @transient private val hivePlanner = new SparkPlanner(this) with HiveStrategies { - val hiveContext = sqlContext + val hiveContext = sqlContext.asInstanceOf[HiveContext] override def strategies: Seq[Strategy] = experimental.extraStrategies ++ Seq( DataSourceStrategy, From fcb9ee4eed6bb947624360dd2ab5823988468ae4 Mon Sep 17 00:00:00 2001 From: Edoardo Vacchi Date: Wed, 13 May 2015 17:59:34 +0200 Subject: [PATCH 6/8] Re-align QueryExecution name in PruningSuite --- .../org/apache/spark/sql/hive/execution/PruningSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruningSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruningSuite.scala index de6a41ce5bfc..0a2952ffef0d 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruningSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruningSuite.scala @@ -145,7 +145,7 @@ class PruningSuite extends HiveComparisonTest with BeforeAndAfter { expectedScannedColumns: Seq[String], expectedPartValues: Seq[Seq[String]]): Unit = { test(s"$testCaseName - pruning test") { - val plan = new TestHive.QueryExecution(sql).executedPlan + val plan = new TestHive.TestQueryExecution(sql).executedPlan val actualOutputColumns = plan.output.map(_.name) val (actualScannedColumns, actualPartValues) = plan.collect { case p @ HiveTableScan(columns, relation, _) => From 903824860c4db96e64e8ea797c100e4c344e75e5 Mon Sep 17 00:00:00 2001 From: Edoardo Vacchi Date: Wed, 13 May 2015 18:11:20 +0200 Subject: [PATCH 7/8] Fix build errors --- .../sql/hive/thriftserver/AbstractSparkSQLDriver.scala | 4 ++-- .../scala/org/apache/spark/sql/hive/test/TestHive.scala | 8 ++++---- .../apache/spark/sql/hive/execution/PruningSuite.scala | 2 +- 3 files changed, 7 insertions(+), 7 deletions(-) diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/AbstractSparkSQLDriver.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/AbstractSparkSQLDriver.scala index d3ae3391c34a..4d3bb31d682c 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/AbstractSparkSQLDriver.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/AbstractSparkSQLDriver.scala @@ -25,7 +25,7 @@ import org.apache.hadoop.hive.ql.Driver import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse import org.apache.spark.Logging -import org.apache.spark.sql.hive.{HiveContext, HiveMetastoreTypes} +import org.apache.spark.sql.hive.{HiveContext, HiveQueryExecution, HiveMetastoreTypes} private[hive] abstract class AbstractSparkSQLDriver( val context: HiveContext = SparkSQLEnv.hiveContext) extends Driver with Logging { @@ -36,7 +36,7 @@ private[hive] abstract class AbstractSparkSQLDriver( override def init(): Unit = { } - private def getResultSetSchema(query: QueryExecution): Schema = { + private def getResultSetSchema(query: HiveQueryExecution): Schema = { val analyzed = query.analyzed logDebug(s"Result Schema: ${analyzed.output}") if (analyzed.output.size == 0) { diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala index bf362a3d455e..5d657b64415b 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala @@ -93,8 +93,8 @@ class TestHiveContext(sc: SparkContext) extends HiveContext(sc) { override def runSqlHive(sql: String): Seq[String] = super.runSqlHive(rewritePaths(substitutor.substitute(this.hiveconf, sql))) - override def executePlan(plan: LogicalPlan): TestQueryExecution = - new TestQueryExecution(this, plan) + override def executePlan(plan: LogicalPlan): this.QueryExecution = + new this.QueryExecution(this, plan) override protected[sql] def createSession(): SQLSession = { new this.SQLSession() @@ -158,7 +158,7 @@ class TestHiveContext(sc: SparkContext) extends HiveContext(sc) { /** * Override QueryExecution with special debug workflow. */ - class TestQueryExecution(hiveContext: HiveContext, logicalPlan: LogicalPlan) + class QueryExecution(hiveContext: HiveContext, logicalPlan: LogicalPlan) extends HiveQueryExecution(hiveContext, logicalPlan) { def this(sql: String) = this(self, parseSql(sql)) override lazy val analyzed = { @@ -184,7 +184,7 @@ class TestHiveContext(sc: SparkContext) extends HiveContext(sc) { protected[hive] implicit class SqlCmd(sql: String) { def cmd: () => Unit = { - () => new TestQueryExecution(sql).stringResult(): Unit + () => new QueryExecution(sql).stringResult(): Unit } } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruningSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruningSuite.scala index 0a2952ffef0d..de6a41ce5bfc 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruningSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruningSuite.scala @@ -145,7 +145,7 @@ class PruningSuite extends HiveComparisonTest with BeforeAndAfter { expectedScannedColumns: Seq[String], expectedPartValues: Seq[Seq[String]]): Unit = { test(s"$testCaseName - pruning test") { - val plan = new TestHive.TestQueryExecution(sql).executedPlan + val plan = new TestHive.QueryExecution(sql).executedPlan val actualOutputColumns = plan.output.map(_.name) val (actualScannedColumns, actualPartValues) = plan.collect { case p @ HiveTableScan(columns, relation, _) => From 443d42f44bd849228dbdeb70722983104107a49e Mon Sep 17 00:00:00 2001 From: Edoardo Vacchi Date: Thu, 14 May 2015 11:33:11 +0200 Subject: [PATCH 8/8] add `lazy val = sqlContext.planner` --- .../apache/spark/sql/execution/SparkStrategies.scala | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala index e65154fa868f..0c662767e1c8 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala @@ -38,7 +38,8 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] { def unsafeEnabled: Boolean def numPartitions: Int - + lazy val planner = sqlContext.planner + /** * Used to build table scan operators where complex projection and filtering are done using * separate physical operators. This function returns the given scan operator with Project and @@ -181,7 +182,7 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] { if canBeCodeGened( allAggregates(partialComputation) ++ allAggregates(rewrittenAggregateExpressions)) && - sqlContext.planner.codegenEnabled => + planner.codegenEnabled => execution.GeneratedAggregate( partial = false, namedGroupingAttributes, @@ -300,7 +301,7 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] { } else { identity[Seq[Expression]] _ } - sqlContext.planner.pruneFilterProject( + planner.pruneFilterProject( projectList, filters, prunePushedDownFilters, @@ -316,7 +317,7 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] { object InMemoryScans extends Strategy { def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { case PhysicalOperation(projectList, filters, mem: InMemoryRelation) => - sqlContext.planner.pruneFilterProject( + planner.pruneFilterProject( projectList, filters, identity[Seq[Expression]], // All filters still need to be evaluated. @@ -327,7 +328,7 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] { // Can we automate these 'pass through' operations? object BasicOperators extends Strategy { - def numPartitions: Int = sqlContext.planner.numPartitions + def numPartitions: Int = planner.numPartitions def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { case r: RunnableCommand => ExecutedCommand(r) :: Nil