-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-44355][SQL] Move WithCTE into command queries #42036
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 7 commits
4eabbd8
4349764
4f1cbee
a6633e4
25230f2
58bc261
fedc9a6
45672ac
8a36db1
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -20,18 +20,18 @@ package org.apache.spark.sql.catalyst.analysis | |
| import scala.collection.mutable.ArrayBuffer | ||
|
|
||
| import org.apache.spark.sql.catalyst.expressions.SubqueryExpression | ||
| import org.apache.spark.sql.catalyst.plans.logical.{Command, CTERelationDef, CTERelationRef, InsertIntoDir, LogicalPlan, ParsedStatement, SubqueryAlias, UnresolvedWith, WithCTE} | ||
| import org.apache.spark.sql.catalyst.plans.logical.{Command, CTEInChildren, CTERelationDef, CTERelationRef, InsertIntoDir, LogicalPlan, ParsedStatement, SubqueryAlias, UnresolvedWith, WithCTE} | ||
| import org.apache.spark.sql.catalyst.rules.Rule | ||
| import org.apache.spark.sql.catalyst.trees.TreePattern._ | ||
| import org.apache.spark.sql.catalyst.util.TypeUtils._ | ||
| import org.apache.spark.sql.errors.QueryCompilationErrors | ||
| import org.apache.spark.sql.internal.SQLConf | ||
| import org.apache.spark.sql.internal.SQLConf.{LEGACY_CTE_PRECEDENCE_POLICY, LegacyBehaviorPolicy} | ||
|
|
||
| /** | ||
| * Analyze WITH nodes and substitute child plan with CTE references or CTE definitions depending | ||
| * on the conditions below: | ||
| * 1. If in legacy mode, or if the query is a SQL command or DML statement, replace with CTE | ||
| * definitions, i.e., inline CTEs. | ||
| * 1. If in legacy mode, replace with CTE definitions, i.e., inline CTEs. | ||
| * 2. Otherwise, replace with CTE references `CTERelationRef`s. The decision to inline or not | ||
| * inline will be made later by the rule `InlineCTE` after query analysis. | ||
| * | ||
|
|
@@ -46,42 +46,65 @@ import org.apache.spark.sql.internal.SQLConf.{LEGACY_CTE_PRECEDENCE_POLICY, Lega | |
| * dependency for any valid CTE query (i.e., given CTE definitions A and B with B referencing A, | ||
| * A is guaranteed to appear before B). Otherwise, it must be an invalid user query, and an | ||
| * analysis exception will be thrown later by relation resolving rules. | ||
| * | ||
| * If the query is a SQL command or DML statement (extends `CTEInChildren`), | ||
| * place `WithCTE` into their children. | ||
| */ | ||
| object CTESubstitution extends Rule[LogicalPlan] { | ||
| def apply(plan: LogicalPlan): LogicalPlan = { | ||
| if (!plan.containsPattern(UNRESOLVED_WITH)) { | ||
| return plan | ||
| } | ||
| val isCommand = plan.exists { | ||
| case _: Command | _: ParsedStatement | _: InsertIntoDir => true | ||
| case _ => false | ||
|
|
||
| val forceInline = if (conf.getConf(SQLConf.LEGACY_INLINE_CTE_IN_COMMANDS)) { | ||
| // The legacy behavior always inlines the CTE relations for queries in commands. | ||
| plan.exists { | ||
| case _: Command | _: ParsedStatement | _: InsertIntoDir => true | ||
| case _ => false | ||
| } | ||
| } else { | ||
| val commands = plan.collect { | ||
| case c @ (_: Command | _: ParsedStatement | _: InsertIntoDir) => c | ||
| } | ||
| if (commands.length == 1) { | ||
| // If there is only one command and it's `CTEInChildren`, we can resolve | ||
| // CTE normally and don't need to force inline. | ||
| !commands.head.isInstanceOf[CTEInChildren] | ||
| } else if (commands.length > 1) { | ||
| // This can happen with the multi-insert statement. We should fall back to | ||
| // the legacy behavior. | ||
| true | ||
| } else { | ||
| false | ||
| } | ||
| } | ||
|
|
||
| val cteDefs = ArrayBuffer.empty[CTERelationDef] | ||
| val (substituted, firstSubstituted) = | ||
| LegacyBehaviorPolicy.withName(conf.getConf(LEGACY_CTE_PRECEDENCE_POLICY)) match { | ||
| case LegacyBehaviorPolicy.EXCEPTION => | ||
| assertNoNameConflictsInCTE(plan) | ||
| traverseAndSubstituteCTE(plan, isCommand, Seq.empty, cteDefs) | ||
| traverseAndSubstituteCTE(plan, forceInline, Seq.empty, cteDefs) | ||
| case LegacyBehaviorPolicy.LEGACY => | ||
| (legacyTraverseAndSubstituteCTE(plan, cteDefs), None) | ||
| case LegacyBehaviorPolicy.CORRECTED => | ||
| traverseAndSubstituteCTE(plan, isCommand, Seq.empty, cteDefs) | ||
| traverseAndSubstituteCTE(plan, forceInline, Seq.empty, cteDefs) | ||
| } | ||
| if (cteDefs.isEmpty) { | ||
| substituted | ||
| } else if (substituted eq firstSubstituted.get) { | ||
| WithCTE(substituted, cteDefs.toSeq) | ||
| withCTEDefs(substituted, cteDefs.toSeq) | ||
| } else { | ||
| var done = false | ||
| substituted.resolveOperatorsWithPruning(_ => !done) { | ||
| case p if p eq firstSubstituted.get => | ||
| // `firstSubstituted` is the parent of all other CTEs (if any). | ||
| done = true | ||
| WithCTE(p, cteDefs.toSeq) | ||
| withCTEDefs(p, cteDefs.toSeq) | ||
| case p if p.children.count(_.containsPattern(CTE)) > 1 => | ||
| // This is the first common parent of all CTEs. | ||
| done = true | ||
| WithCTE(p, cteDefs.toSeq) | ||
| withCTEDefs(p, cteDefs.toSeq) | ||
| } | ||
| } | ||
| } | ||
|
|
@@ -131,7 +154,7 @@ object CTESubstitution extends Rule[LogicalPlan] { | |
| plan.resolveOperatorsUp { | ||
| case UnresolvedWith(child, relations) => | ||
| val resolvedCTERelations = | ||
| resolveCTERelations(relations, isLegacy = true, isCommand = false, Seq.empty, cteDefs) | ||
| resolveCTERelations(relations, isLegacy = true, forceInline = false, Seq.empty, cteDefs) | ||
| substituteCTE(child, alwaysInline = true, resolvedCTERelations) | ||
| } | ||
| } | ||
|
|
@@ -168,27 +191,27 @@ object CTESubstitution extends Rule[LogicalPlan] { | |
| * SELECT * FROM t | ||
| * ) | ||
| * @param plan the plan to be traversed | ||
| * @param isCommand if this is a command | ||
| * @param forceInline always inline the CTE relations if this is true | ||
| * @param outerCTEDefs already resolved outer CTE definitions with names | ||
| * @param cteDefs all accumulated CTE definitions | ||
| * @return the plan where CTE substitution is applied and optionally the last substituted `With` | ||
| * where CTE definitions will be gathered to | ||
| */ | ||
| private def traverseAndSubstituteCTE( | ||
| plan: LogicalPlan, | ||
| isCommand: Boolean, | ||
| forceInline: Boolean, | ||
| outerCTEDefs: Seq[(String, CTERelationDef)], | ||
| cteDefs: ArrayBuffer[CTERelationDef]): (LogicalPlan, Option[LogicalPlan]) = { | ||
| var firstSubstituted: Option[LogicalPlan] = None | ||
| val newPlan = plan.resolveOperatorsDownWithPruning( | ||
| _.containsAnyPattern(UNRESOLVED_WITH, PLAN_EXPRESSION)) { | ||
| case UnresolvedWith(child: LogicalPlan, relations) => | ||
| val resolvedCTERelations = | ||
| resolveCTERelations(relations, isLegacy = false, isCommand, outerCTEDefs, cteDefs) ++ | ||
| resolveCTERelations(relations, isLegacy = false, forceInline, outerCTEDefs, cteDefs) ++ | ||
| outerCTEDefs | ||
| val substituted = substituteCTE( | ||
| traverseAndSubstituteCTE(child, isCommand, resolvedCTERelations, cteDefs)._1, | ||
| isCommand, | ||
| traverseAndSubstituteCTE(child, forceInline, resolvedCTERelations, cteDefs)._1, | ||
| forceInline, | ||
| resolvedCTERelations) | ||
| if (firstSubstituted.isEmpty) { | ||
| firstSubstituted = Some(substituted) | ||
|
|
@@ -206,10 +229,11 @@ object CTESubstitution extends Rule[LogicalPlan] { | |
| private def resolveCTERelations( | ||
| relations: Seq[(String, SubqueryAlias)], | ||
| isLegacy: Boolean, | ||
| isCommand: Boolean, | ||
| forceInline: Boolean, | ||
| outerCTEDefs: Seq[(String, CTERelationDef)], | ||
| cteDefs: ArrayBuffer[CTERelationDef]): Seq[(String, CTERelationDef)] = { | ||
| var resolvedCTERelations = if (isLegacy || isCommand) { | ||
| val alwaysInline = isLegacy || forceInline | ||
| var resolvedCTERelations = if (alwaysInline) { | ||
| Seq.empty | ||
| } else { | ||
| outerCTEDefs | ||
|
|
@@ -232,12 +256,12 @@ object CTESubstitution extends Rule[LogicalPlan] { | |
| // WITH t3 AS (SELECT * FROM t1) | ||
| // ) | ||
| // t3 should resolve the t1 to `SELECT 2` instead of `SELECT 1`. | ||
| traverseAndSubstituteCTE(relation, isCommand, resolvedCTERelations, cteDefs)._1 | ||
| traverseAndSubstituteCTE(relation, forceInline, resolvedCTERelations, cteDefs)._1 | ||
| } | ||
| // CTE definition can reference a previous one | ||
| val substituted = substituteCTE(innerCTEResolved, isLegacy || isCommand, resolvedCTERelations) | ||
| val substituted = substituteCTE(innerCTEResolved, alwaysInline, resolvedCTERelations) | ||
| val cteRelation = CTERelationDef(substituted) | ||
| if (!(isLegacy || isCommand)) { | ||
| if (!alwaysInline) { | ||
| cteDefs += cteRelation | ||
| } | ||
| // Prepending new CTEs makes sure that those have higher priority over outer ones. | ||
|
|
@@ -249,7 +273,7 @@ object CTESubstitution extends Rule[LogicalPlan] { | |
| private def substituteCTE( | ||
| plan: LogicalPlan, | ||
| alwaysInline: Boolean, | ||
| cteRelations: Seq[(String, CTERelationDef)]): LogicalPlan = | ||
| cteRelations: Seq[(String, CTERelationDef)]): LogicalPlan = { | ||
| plan.resolveOperatorsUpWithPruning( | ||
| _.containsAnyPattern(RELATION_TIME_TRAVEL, UNRESOLVED_RELATION, PLAN_EXPRESSION)) { | ||
| case RelationTimeTravel(UnresolvedRelation(Seq(table), _, _), _, _) | ||
|
|
@@ -273,4 +297,21 @@ object CTESubstitution extends Rule[LogicalPlan] { | |
| e.withNewPlan(apply(substituteCTE(e.plan, alwaysInline, cteRelations))) | ||
| } | ||
| } | ||
| } | ||
|
|
||
| /** | ||
| * For commands which extend `CTEInChildren`, we should place the `WithCTE` node on its | ||
| * children. There are two reasons: | ||
| * 1. Some rules will pattern match the root command nodes, and we should keep command | ||
| * as the root node to not break them. | ||
| * 2. `Dataset` eagerly executes the commands inside a query plan. However, the CTE | ||
|
||
| * references inside commands will be invalid if we execute the command alone, as | ||
| * the CTE definitions are outside of the command. | ||
| */ | ||
| private def withCTEDefs(p: LogicalPlan, cteDefs: Seq[CTERelationDef]): LogicalPlan = { | ||
| p match { | ||
| case c: CTEInChildren => c.withCTEDefs(cteDefs) | ||
| case _ => WithCTE(p, cteDefs) | ||
| } | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -677,7 +677,7 @@ case class InsertIntoDir( | |
| provider: Option[String], | ||
| child: LogicalPlan, | ||
| overwrite: Boolean = true) | ||
| extends UnaryNode { | ||
| extends UnaryNode with CTEInChildren { | ||
|
|
||
| override def output: Seq[Attribute] = Seq.empty | ||
| override def metadataOutput: Seq[Attribute] = Nil | ||
|
|
@@ -887,6 +887,16 @@ case class WithCTE(plan: LogicalPlan, cteDefs: Seq[CTERelationDef]) extends Logi | |
| } | ||
| } | ||
|
|
||
| /** | ||
| * The logical node which is able to place the `WithCTE` node on its children. | ||
| */ | ||
| trait CTEInChildren extends LogicalPlan { | ||
| def withCTEDefs(cteDefs: Seq[CTERelationDef]): LogicalPlan = { | ||
| withNewChildren(children.map(WithCTE(_, cteDefs))) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I wonder if it makes sense to assert that we have only 1 child.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It seems fine to have multiple children, we just duplicate the CTE relations. The current code does not allow it though, and go back to inline CTE.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm not sure that it is always fine to duplicate CTE relations into multiple childrens. |
||
| } | ||
| } | ||
|
|
||
|
|
||
| case class WithWindowDefinition( | ||
| windowDefinitions: Map[String, WindowSpecDefinition], | ||
| child: LogicalPlan) extends UnaryNode { | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -46,7 +46,7 @@ trait KeepAnalyzedQuery extends Command { | |
| /** | ||
| * Base trait for DataSourceV2 write commands | ||
| */ | ||
| trait V2WriteCommand extends UnaryCommand with KeepAnalyzedQuery { | ||
| trait V2WriteCommand extends UnaryCommand with KeepAnalyzedQuery with CTEInChildren { | ||
| def table: NamedRelation | ||
| def query: LogicalPlan | ||
| def isByName: Boolean | ||
|
|
@@ -392,9 +392,16 @@ case class WriteDelta( | |
| } | ||
| } | ||
|
|
||
| trait V2CreateTableAsSelectPlan extends V2CreateTablePlan with AnalysisOnlyCommand { | ||
| trait V2CreateTableAsSelectPlan | ||
| extends V2CreateTablePlan | ||
| with AnalysisOnlyCommand | ||
| with CTEInChildren { | ||
| def query: LogicalPlan | ||
|
|
||
| override def withCTEDefs(cteDefs: Seq[CTERelationDef]): LogicalPlan = { | ||
| withNameAndQuery(newName = name, newQuery = WithCTE(query, cteDefs)) | ||
| } | ||
|
|
||
| override lazy val resolved: Boolean = childrenResolved && { | ||
| // the table schema is created from the query schema, so the only resolution needed is to check | ||
| // that the columns referenced by the table's partitioning exist in the query schema | ||
|
|
@@ -1234,12 +1241,16 @@ case class RepairTable( | |
| case class AlterViewAs( | ||
| child: LogicalPlan, | ||
| originalText: String, | ||
| query: LogicalPlan) extends BinaryCommand { | ||
| query: LogicalPlan) extends BinaryCommand with CTEInChildren { | ||
| override def left: LogicalPlan = child | ||
| override def right: LogicalPlan = query | ||
| override protected def withNewChildrenInternal( | ||
| newLeft: LogicalPlan, newRight: LogicalPlan): LogicalPlan = | ||
| copy(child = newLeft, query = newRight) | ||
|
|
||
| override def withCTEDefs(cteDefs: Seq[CTERelationDef]): LogicalPlan = { | ||
| withNewChildren(Seq(child, WithCTE(query, cteDefs))) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why not just
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
| } | ||
| } | ||
|
|
||
| /** | ||
|
|
@@ -1253,12 +1264,16 @@ case class CreateView( | |
| originalText: Option[String], | ||
| query: LogicalPlan, | ||
| allowExisting: Boolean, | ||
| replace: Boolean) extends BinaryCommand { | ||
| replace: Boolean) extends BinaryCommand with CTEInChildren { | ||
| override def left: LogicalPlan = child | ||
| override def right: LogicalPlan = query | ||
| override protected def withNewChildrenInternal( | ||
| newLeft: LogicalPlan, newRight: LogicalPlan): LogicalPlan = | ||
| copy(child = newLeft, query = newRight) | ||
|
|
||
| override def withCTEDefs(cteDefs: Seq[CTERelationDef]): LogicalPlan = { | ||
| withNewChildren(Seq(child, WithCTE(query, cteDefs))) | ||
| } | ||
| } | ||
|
|
||
| /** | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -3759,6 +3759,14 @@ object SQLConf { | |
| .checkValues(LegacyBehaviorPolicy.values.map(_.toString)) | ||
| .createWithDefault(LegacyBehaviorPolicy.EXCEPTION.toString) | ||
|
|
||
| val LEGACY_INLINE_CTE_IN_COMMANDS = buildConf("spark.sql.legacy.inlineCTEInCommands") | ||
| .internal() | ||
| .doc("If true, always inline the CTE relations for the queries in commands. This is the " + | ||
| "legacy behavior which may produce incorrect results because Spark may evaluate a CTE " + | ||
| "relation more than once, even if it's nondeterministic.") | ||
| .booleanConf | ||
cloud-fan marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| .createWithDefault(false) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do we need version here?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. good catch! |
||
|
|
||
| val LEGACY_TIME_PARSER_POLICY = buildConf("spark.sql.legacy.timeParserPolicy") | ||
| .internal() | ||
| .doc("When LEGACY, java.text.SimpleDateFormat is used for formatting and parsing " + | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: There is duplicated logic here.
To make the code more readable, we can always collect the commands first. If the length of commands is 1, there is a different behavior based on the legacy conf. Otherwise the logic is determined.