From 4eabbd8c9e15098fa7f40c8940e2bebf6781f4ed Mon Sep 17 00:00:00 2001 From: Max Gekk Date: Sun, 16 Jul 2023 12:26:13 +0300 Subject: [PATCH 1/9] Add WithCTEInChildren --- .../catalyst/plans/logical/basicLogicalOperators.scala | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala index f8ba042009b2..84dc1d2b4d57 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala @@ -896,6 +896,15 @@ case class WithWindowDefinition( copy(child = newChild) } +/** + * The logical node is able to insert the given `WithCTE` into its children. + */ +trait WithCTEInChildren extends LogicalPlan { + def withCTE(withCTE: WithCTE): LogicalPlan = { + withNewChildren(children.map(withCTE.withNewPlan)) + } +} + /** * @param order The ordering expressions * @param global True means global sorting apply for entire data set, From 4349764f2d572ad2e6654ff17723e2fac995a613 Mon Sep 17 00:00:00 2001 From: Max Gekk Date: Sun, 16 Jul 2023 13:54:28 +0300 Subject: [PATCH 2/9] with WithCTEInChildren --- .../catalyst/analysis/CTESubstitution.scala | 7 ++--- .../plans/logical/basicLogicalOperators.scala | 2 +- .../catalyst/plans/logical/statements.scala | 2 +- .../catalyst/plans/logical/v2Commands.scala | 29 ++++++++++++++++--- .../command/DataWritingCommand.scala | 4 +-- .../InsertIntoDataSourceDirCommand.scala | 8 +++-- .../command/createDataSourceTables.scala | 10 +++++-- .../spark/sql/execution/command/tables.scala | 6 +++- .../spark/sql/execution/command/views.scala | 14 +++++++-- .../InsertIntoDataSourceCommand.scala | 8 +++-- .../InsertIntoHadoopFsRelationCommand.scala | 8 +++-- .../SaveIntoDataSourceCommand.scala | 8 +++-- .../CreateHiveTableAsSelectCommand.scala | 8 +++-- 13 files changed, 84 insertions(+), 30 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CTESubstitution.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CTESubstitution.scala index 4e3234f9c0dc..84c9c55bbc9a 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CTESubstitution.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CTESubstitution.scala @@ -20,7 +20,7 @@ package org.apache.spark.sql.catalyst.analysis import scala.collection.mutable.ArrayBuffer import org.apache.spark.sql.catalyst.expressions.SubqueryExpression -import org.apache.spark.sql.catalyst.plans.logical.{Command, CTERelationDef, CTERelationRef, InsertIntoDir, LogicalPlan, ParsedStatement, SubqueryAlias, UnresolvedWith, WithCTE} +import org.apache.spark.sql.catalyst.plans.logical.{CTERelationDef, CTERelationRef, LogicalPlan, SubqueryAlias, UnresolvedWith, WithCTE} import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.catalyst.trees.TreePattern._ import org.apache.spark.sql.catalyst.util.TypeUtils._ @@ -52,10 +52,7 @@ object CTESubstitution extends Rule[LogicalPlan] { if (!plan.containsPattern(UNRESOLVED_WITH)) { return plan } - val isCommand = plan.exists { - case _: Command | _: ParsedStatement | _: InsertIntoDir => true - case _ => false - } + val isCommand = false val cteDefs = ArrayBuffer.empty[CTERelationDef] val (substituted, firstSubstituted) = LegacyBehaviorPolicy.withName(conf.getConf(LEGACY_CTE_PRECEDENCE_POLICY)) match { diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala index 84dc1d2b4d57..917de521ede2 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala @@ -677,7 +677,7 @@ case class InsertIntoDir( provider: Option[String], child: LogicalPlan, overwrite: Boolean = true) - extends UnaryNode { + extends UnaryNode with WithCTEInChildren { override def output: Seq[Attribute] = Seq.empty override def metadataOutput: Seq[Attribute] = Nil diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statements.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statements.scala index 669750ee448d..e9d565282f4b 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statements.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statements.scala @@ -40,7 +40,7 @@ import org.apache.spark.sql.types.DataType * Parsed logical plans are located in Catalyst so that as much SQL parsing logic as possible is be * kept in a [[org.apache.spark.sql.catalyst.parser.AbstractSqlParser]]. */ -abstract class ParsedStatement extends LogicalPlan { +abstract class ParsedStatement extends LogicalPlan with WithCTEInChildren { // Redact properties and options when parsed nodes are used by generic methods like toString override def productIterator: Iterator[Any] = super.productIterator.map { case mapArg: Map[_, _] => conf.redactOptions(mapArg) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala index 739ffa487e39..dd701a2eb929 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala @@ -46,7 +46,7 @@ trait KeepAnalyzedQuery extends Command { /** * Base trait for DataSourceV2 write commands */ -trait V2WriteCommand extends UnaryCommand with KeepAnalyzedQuery { +trait V2WriteCommand extends UnaryCommand with KeepAnalyzedQuery with WithCTEInChildren { def table: NamedRelation def query: LogicalPlan def isByName: Boolean @@ -392,9 +392,18 @@ case class WriteDelta( } } -trait V2CreateTableAsSelectPlan extends V2CreateTablePlan with AnalysisOnlyCommand { +trait V2CreateTableAsSelectPlan + extends V2CreateTablePlan + with AnalysisOnlyCommand + with WithCTEInChildren { def query: LogicalPlan + override def withCTE(withCTE: WithCTE): LogicalPlan = { + withNameAndQuery( + newName = this.name, + newQuery = withCTE.copy(plan = this.query)) + } + override lazy val resolved: Boolean = childrenResolved && { // the table schema is created from the query schema, so the only resolution needed is to check // that the columns referenced by the table's partitioning exist in the query schema @@ -1234,12 +1243,18 @@ case class RepairTable( case class AlterViewAs( child: LogicalPlan, originalText: String, - query: LogicalPlan) extends BinaryCommand { + query: LogicalPlan) extends BinaryCommand with WithCTEInChildren { override def left: LogicalPlan = child override def right: LogicalPlan = query override protected def withNewChildrenInternal( newLeft: LogicalPlan, newRight: LogicalPlan): LogicalPlan = copy(child = newLeft, query = newRight) + + override def withCTE(withCTE: WithCTE): LogicalPlan = { + withNewChildrenInternal( + newLeft = this.left, + newRight = withCTE.copy(plan = this.right)) + } } /** @@ -1253,12 +1268,18 @@ case class CreateView( originalText: Option[String], query: LogicalPlan, allowExisting: Boolean, - replace: Boolean) extends BinaryCommand { + replace: Boolean) extends BinaryCommand with WithCTEInChildren { override def left: LogicalPlan = child override def right: LogicalPlan = query override protected def withNewChildrenInternal( newLeft: LogicalPlan, newRight: LogicalPlan): LogicalPlan = copy(child = newLeft, query = newRight) + + override def withCTE(withCTE: WithCTE): LogicalPlan = { + withNewChildrenInternal( + newLeft = this.left, + newRight = withCTE.copy(plan = this.right)) + } } /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/DataWritingCommand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/DataWritingCommand.scala index 338ce8cac420..fcc7a0aac678 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/DataWritingCommand.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/DataWritingCommand.scala @@ -24,7 +24,7 @@ import org.apache.hadoop.conf.Configuration import org.apache.spark.SparkContext import org.apache.spark.sql.{Row, SaveMode, SparkSession} import org.apache.spark.sql.catalyst.expressions.Attribute -import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, UnaryCommand} +import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, UnaryCommand, WithCTEInChildren} import org.apache.spark.sql.errors.QueryCompilationErrors import org.apache.spark.sql.execution.{SparkPlan, SQLExecution} import org.apache.spark.sql.execution.datasources.BasicWriteJobStatsTracker @@ -35,7 +35,7 @@ import org.apache.spark.util.SerializableConfiguration /** * A special `Command` which writes data out and updates metrics. */ -trait DataWritingCommand extends UnaryCommand { +trait DataWritingCommand extends UnaryCommand with WithCTEInChildren { /** * The input query plan that produces the data to be written. * IMPORTANT: the input query plan MUST be analyzed, so that we can carry its output columns diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/InsertIntoDataSourceDirCommand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/InsertIntoDataSourceDirCommand.scala index 35c8bec37162..78fa701d5cda 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/InsertIntoDataSourceDirCommand.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/InsertIntoDataSourceDirCommand.scala @@ -19,7 +19,7 @@ package org.apache.spark.sql.execution.command import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.catalog._ -import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, WithCTE, WithCTEInChildren} import org.apache.spark.sql.errors.QueryExecutionErrors import org.apache.spark.sql.execution.datasources._ @@ -42,7 +42,7 @@ case class InsertIntoDataSourceDirCommand( storage: CatalogStorageFormat, provider: String, query: LogicalPlan, - overwrite: Boolean) extends LeafRunnableCommand { + overwrite: Boolean) extends LeafRunnableCommand with WithCTEInChildren { override def innerChildren: Seq[LogicalPlan] = query :: Nil @@ -76,4 +76,8 @@ case class InsertIntoDataSourceDirCommand( Seq.empty[Row] } + + override def withCTE(withCTE: WithCTE): LogicalPlan = { + copy(query = withCTE.copy(plan = this.query)) + } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala index 3848d5505155..7b6d387ff50f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala @@ -21,8 +21,8 @@ import java.net.URI import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.catalog._ -import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan -import org.apache.spark.sql.catalyst.util.{removeInternalMetadata, CharVarcharUtils} +import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, WithCTE, WithCTEInChildren} +import org.apache.spark.sql.catalyst.util.{CharVarcharUtils, removeInternalMetadata} import org.apache.spark.sql.errors.QueryCompilationErrors import org.apache.spark.sql.execution.CommandExecutionMode import org.apache.spark.sql.execution.datasources._ @@ -141,7 +141,7 @@ case class CreateDataSourceTableAsSelectCommand( mode: SaveMode, query: LogicalPlan, outputColumnNames: Seq[String]) - extends LeafRunnableCommand { + extends LeafRunnableCommand with WithCTEInChildren { assert(query.resolved) override def innerChildren: Seq[LogicalPlan] = query :: Nil @@ -233,4 +233,8 @@ case class CreateDataSourceTableAsSelectCommand( throw ex } } + + override def withCTE(withCTE: WithCTE): LogicalPlan = { + copy(query = withCTE.copy(plan = this.query)) + } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala index 351f6d5456d8..5298ac5e48e7 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala @@ -735,7 +735,7 @@ case class DescribeTableCommand( * 7. Common table expressions (CTEs) */ case class DescribeQueryCommand(queryText: String, plan: LogicalPlan) - extends DescribeCommandBase { + extends DescribeCommandBase with WithCTEInChildren { override val output = DescribeCommandSchema.describeTableAttributes() @@ -747,6 +747,10 @@ case class DescribeQueryCommand(queryText: String, plan: LogicalPlan) describeSchema(queryExecution.analyzed.schema, result, header = false) result.toSeq } + + override def withCTE(withCTE: WithCTE): LogicalPlan = { + copy(plan = withCTE.copy(plan = this.plan)) + } } /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala index 3718794ea590..19aa56aff812 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala @@ -28,7 +28,7 @@ import org.apache.spark.sql.catalyst.{SQLConfHelper, TableIdentifier} import org.apache.spark.sql.catalyst.analysis.{AnalysisContext, GlobalTempView, LocalTempView, ViewType} import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, CatalogTable, CatalogTableType, TemporaryViewRelation} import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, SubqueryExpression} -import org.apache.spark.sql.catalyst.plans.logical.{AnalysisOnlyCommand, LogicalPlan, Project, View} +import org.apache.spark.sql.catalyst.plans.logical.{AnalysisOnlyCommand, LogicalPlan, Project, View, WithCTE, WithCTEInChildren} import org.apache.spark.sql.catalyst.util.CharVarcharUtils import org.apache.spark.sql.connector.catalog.CatalogV2Implicits.NamespaceHelper import org.apache.spark.sql.errors.QueryCompilationErrors @@ -69,7 +69,7 @@ case class CreateViewCommand( viewType: ViewType, isAnalyzed: Boolean = false, referredTempFunctions: Seq[String] = Seq.empty) - extends RunnableCommand with AnalysisOnlyCommand { + extends RunnableCommand with AnalysisOnlyCommand with WithCTEInChildren { import ViewHelper._ @@ -215,6 +215,10 @@ case class CreateViewCommand( comment = comment ) } + + override def withCTE(withCTE: WithCTE): LogicalPlan = { + copy(plan = withCTE.copy(plan = this.plan)) + } } /** @@ -235,7 +239,7 @@ case class AlterViewAsCommand( query: LogicalPlan, isAnalyzed: Boolean = false, referredTempFunctions: Seq[String] = Seq.empty) - extends RunnableCommand with AnalysisOnlyCommand { + extends RunnableCommand with AnalysisOnlyCommand with WithCTEInChildren { import ViewHelper._ @@ -307,6 +311,10 @@ case class AlterViewAsCommand( session.sessionState.catalog.alterTable(updatedViewMeta) } + + override def withCTE(withCTE: WithCTE): LogicalPlan = { + copy(query = withCTE.copy(plan = this.query)) + } } /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoDataSourceCommand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoDataSourceCommand.scala index 789b1d714fcb..b0e5482eb945 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoDataSourceCommand.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoDataSourceCommand.scala @@ -19,7 +19,7 @@ package org.apache.spark.sql.execution.datasources import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.plans.QueryPlan -import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, WithCTE, WithCTEInChildren} import org.apache.spark.sql.execution.command.LeafRunnableCommand import org.apache.spark.sql.sources.InsertableRelation @@ -31,7 +31,7 @@ case class InsertIntoDataSourceCommand( logicalRelation: LogicalRelation, query: LogicalPlan, overwrite: Boolean) - extends LeafRunnableCommand { + extends LeafRunnableCommand with WithCTEInChildren { override def innerChildren: Seq[QueryPlan[_]] = Seq(query) @@ -47,4 +47,8 @@ case class InsertIntoDataSourceCommand( Seq.empty[Row] } + + override def withCTE(withCTE: WithCTE): LogicalPlan = { + copy(query = withCTE.copy(plan = this.query)) + } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala index fe6ec094812e..fd2d52f1ac0b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala @@ -25,7 +25,7 @@ import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogTable, CatalogT import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec import org.apache.spark.sql.catalyst.catalog.ExternalCatalogUtils._ import org.apache.spark.sql.catalyst.expressions.{Attribute, SortOrder} -import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, WithCTE, WithCTEInChildren} import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap import org.apache.spark.sql.errors.{QueryCompilationErrors, QueryExecutionErrors} import org.apache.spark.sql.execution.SparkPlan @@ -57,7 +57,7 @@ case class InsertIntoHadoopFsRelationCommand( catalogTable: Option[CatalogTable], fileIndex: Option[FileIndex], outputColumnNames: Seq[String]) - extends V1WriteCommand { + extends V1WriteCommand with WithCTEInChildren { private lazy val parameters = CaseInsensitiveMap(options) @@ -277,4 +277,8 @@ case class InsertIntoHadoopFsRelationCommand( override protected def withNewChildInternal( newChild: LogicalPlan): InsertIntoHadoopFsRelationCommand = copy(query = newChild) + + override def withCTE(withCTE: WithCTE): LogicalPlan = { + withNewChildInternal(withCTE.copy(plan = this.query)) + } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SaveIntoDataSourceCommand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SaveIntoDataSourceCommand.scala index 666ae9b5c6f3..81ba84f3713f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SaveIntoDataSourceCommand.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SaveIntoDataSourceCommand.scala @@ -21,7 +21,7 @@ import scala.util.control.NonFatal import org.apache.spark.sql.{Dataset, Row, SaveMode, SparkSession} import org.apache.spark.sql.catalyst.plans.QueryPlan -import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, WithCTE, WithCTEInChildren} import org.apache.spark.sql.catalyst.types.DataTypeUtils.toAttributes import org.apache.spark.sql.execution.command.LeafRunnableCommand import org.apache.spark.sql.sources.CreatableRelationProvider @@ -39,7 +39,7 @@ case class SaveIntoDataSourceCommand( query: LogicalPlan, dataSource: CreatableRelationProvider, options: Map[String, String], - mode: SaveMode) extends LeafRunnableCommand { + mode: SaveMode) extends LeafRunnableCommand with WithCTEInChildren { override def innerChildren: Seq[QueryPlan[_]] = Seq(query) @@ -68,4 +68,8 @@ case class SaveIntoDataSourceCommand( override def clone(): LogicalPlan = { SaveIntoDataSourceCommand(query.clone(), dataSource, options, mode) } + + override def withCTE(withCTE: WithCTE): LogicalPlan = { + copy(query = withCTE.copy(plan = this.query)) + } } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala index 4127e7c75d79..64de738a3b64 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala @@ -21,7 +21,7 @@ import scala.util.control.NonFatal import org.apache.spark.sql.{Row, SaveMode, SparkSession} import org.apache.spark.sql.catalyst.catalog.CatalogTable -import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, WithCTE, WithCTEInChildren} import org.apache.spark.sql.catalyst.util.CharVarcharUtils import org.apache.spark.sql.errors.QueryCompilationErrors import org.apache.spark.sql.execution.command.{DataWritingCommand, LeafRunnableCommand} @@ -38,7 +38,7 @@ case class CreateHiveTableAsSelectCommand( query: LogicalPlan, outputColumnNames: Seq[String], mode: SaveMode) - extends LeafRunnableCommand { + extends LeafRunnableCommand with WithCTEInChildren { assert(query.resolved) override def innerChildren: Seq[LogicalPlan] = query :: Nil @@ -111,4 +111,8 @@ case class CreateHiveTableAsSelectCommand( s"[Database: ${tableDesc.database}, " + s"TableName: ${tableDesc.identifier.table}]" } + + override def withCTE(withCTE: WithCTE): LogicalPlan = { + copy(query = withCTE.copy(plan = this.query)) + } } From 4f1cbee29b05690cdb13908036b91963bfd7d9db Mon Sep 17 00:00:00 2001 From: Max Gekk Date: Sun, 16 Jul 2023 13:56:39 +0300 Subject: [PATCH 3/9] Rename WithCTEInChildren to CTEInChildren --- .../catalyst/plans/logical/basicLogicalOperators.scala | 4 ++-- .../spark/sql/catalyst/plans/logical/statements.scala | 2 +- .../spark/sql/catalyst/plans/logical/v2Commands.scala | 8 ++++---- .../spark/sql/execution/command/DataWritingCommand.scala | 4 ++-- .../command/InsertIntoDataSourceDirCommand.scala | 4 ++-- .../sql/execution/command/createDataSourceTables.scala | 4 ++-- .../org/apache/spark/sql/execution/command/tables.scala | 2 +- .../org/apache/spark/sql/execution/command/views.scala | 6 +++--- .../datasources/InsertIntoDataSourceCommand.scala | 4 ++-- .../datasources/InsertIntoHadoopFsRelationCommand.scala | 4 ++-- .../execution/datasources/SaveIntoDataSourceCommand.scala | 4 ++-- .../hive/execution/CreateHiveTableAsSelectCommand.scala | 4 ++-- 12 files changed, 25 insertions(+), 25 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala index 917de521ede2..4cf09a9a734a 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala @@ -677,7 +677,7 @@ case class InsertIntoDir( provider: Option[String], child: LogicalPlan, overwrite: Boolean = true) - extends UnaryNode with WithCTEInChildren { + extends UnaryNode with CTEInChildren { override def output: Seq[Attribute] = Seq.empty override def metadataOutput: Seq[Attribute] = Nil @@ -899,7 +899,7 @@ case class WithWindowDefinition( /** * The logical node is able to insert the given `WithCTE` into its children. */ -trait WithCTEInChildren extends LogicalPlan { +trait CTEInChildren extends LogicalPlan { def withCTE(withCTE: WithCTE): LogicalPlan = { withNewChildren(children.map(withCTE.withNewPlan)) } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statements.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statements.scala index e9d565282f4b..9efc3b13bc26 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statements.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statements.scala @@ -40,7 +40,7 @@ import org.apache.spark.sql.types.DataType * Parsed logical plans are located in Catalyst so that as much SQL parsing logic as possible is be * kept in a [[org.apache.spark.sql.catalyst.parser.AbstractSqlParser]]. */ -abstract class ParsedStatement extends LogicalPlan with WithCTEInChildren { +abstract class ParsedStatement extends LogicalPlan with CTEInChildren { // Redact properties and options when parsed nodes are used by generic methods like toString override def productIterator: Iterator[Any] = super.productIterator.map { case mapArg: Map[_, _] => conf.redactOptions(mapArg) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala index dd701a2eb929..0f31f0068195 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala @@ -46,7 +46,7 @@ trait KeepAnalyzedQuery extends Command { /** * Base trait for DataSourceV2 write commands */ -trait V2WriteCommand extends UnaryCommand with KeepAnalyzedQuery with WithCTEInChildren { +trait V2WriteCommand extends UnaryCommand with KeepAnalyzedQuery with CTEInChildren { def table: NamedRelation def query: LogicalPlan def isByName: Boolean @@ -395,7 +395,7 @@ case class WriteDelta( trait V2CreateTableAsSelectPlan extends V2CreateTablePlan with AnalysisOnlyCommand - with WithCTEInChildren { + with CTEInChildren { def query: LogicalPlan override def withCTE(withCTE: WithCTE): LogicalPlan = { @@ -1243,7 +1243,7 @@ case class RepairTable( case class AlterViewAs( child: LogicalPlan, originalText: String, - query: LogicalPlan) extends BinaryCommand with WithCTEInChildren { + query: LogicalPlan) extends BinaryCommand with CTEInChildren { override def left: LogicalPlan = child override def right: LogicalPlan = query override protected def withNewChildrenInternal( @@ -1268,7 +1268,7 @@ case class CreateView( originalText: Option[String], query: LogicalPlan, allowExisting: Boolean, - replace: Boolean) extends BinaryCommand with WithCTEInChildren { + replace: Boolean) extends BinaryCommand with CTEInChildren { override def left: LogicalPlan = child override def right: LogicalPlan = query override protected def withNewChildrenInternal( diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/DataWritingCommand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/DataWritingCommand.scala index fcc7a0aac678..2fbca82e82ee 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/DataWritingCommand.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/DataWritingCommand.scala @@ -24,7 +24,7 @@ import org.apache.hadoop.conf.Configuration import org.apache.spark.SparkContext import org.apache.spark.sql.{Row, SaveMode, SparkSession} import org.apache.spark.sql.catalyst.expressions.Attribute -import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, UnaryCommand, WithCTEInChildren} +import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, UnaryCommand, CTEInChildren} import org.apache.spark.sql.errors.QueryCompilationErrors import org.apache.spark.sql.execution.{SparkPlan, SQLExecution} import org.apache.spark.sql.execution.datasources.BasicWriteJobStatsTracker @@ -35,7 +35,7 @@ import org.apache.spark.util.SerializableConfiguration /** * A special `Command` which writes data out and updates metrics. */ -trait DataWritingCommand extends UnaryCommand with WithCTEInChildren { +trait DataWritingCommand extends UnaryCommand with CTEInChildren { /** * The input query plan that produces the data to be written. * IMPORTANT: the input query plan MUST be analyzed, so that we can carry its output columns diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/InsertIntoDataSourceDirCommand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/InsertIntoDataSourceDirCommand.scala index 78fa701d5cda..495e5c9c1838 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/InsertIntoDataSourceDirCommand.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/InsertIntoDataSourceDirCommand.scala @@ -19,7 +19,7 @@ package org.apache.spark.sql.execution.command import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.catalog._ -import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, WithCTE, WithCTEInChildren} +import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, WithCTE, CTEInChildren} import org.apache.spark.sql.errors.QueryExecutionErrors import org.apache.spark.sql.execution.datasources._ @@ -42,7 +42,7 @@ case class InsertIntoDataSourceDirCommand( storage: CatalogStorageFormat, provider: String, query: LogicalPlan, - overwrite: Boolean) extends LeafRunnableCommand with WithCTEInChildren { + overwrite: Boolean) extends LeafRunnableCommand with CTEInChildren { override def innerChildren: Seq[LogicalPlan] = query :: Nil diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala index 7b6d387ff50f..d897e8644ecd 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala @@ -21,7 +21,7 @@ import java.net.URI import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.catalog._ -import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, WithCTE, WithCTEInChildren} +import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, WithCTE, CTEInChildren} import org.apache.spark.sql.catalyst.util.{CharVarcharUtils, removeInternalMetadata} import org.apache.spark.sql.errors.QueryCompilationErrors import org.apache.spark.sql.execution.CommandExecutionMode @@ -141,7 +141,7 @@ case class CreateDataSourceTableAsSelectCommand( mode: SaveMode, query: LogicalPlan, outputColumnNames: Seq[String]) - extends LeafRunnableCommand with WithCTEInChildren { + extends LeafRunnableCommand with CTEInChildren { assert(query.resolved) override def innerChildren: Seq[LogicalPlan] = query :: Nil diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala index 5298ac5e48e7..30fcf6ccdaf5 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala @@ -735,7 +735,7 @@ case class DescribeTableCommand( * 7. Common table expressions (CTEs) */ case class DescribeQueryCommand(queryText: String, plan: LogicalPlan) - extends DescribeCommandBase with WithCTEInChildren { + extends DescribeCommandBase with CTEInChildren { override val output = DescribeCommandSchema.describeTableAttributes() diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala index 19aa56aff812..4636333c8212 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala @@ -28,7 +28,7 @@ import org.apache.spark.sql.catalyst.{SQLConfHelper, TableIdentifier} import org.apache.spark.sql.catalyst.analysis.{AnalysisContext, GlobalTempView, LocalTempView, ViewType} import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, CatalogTable, CatalogTableType, TemporaryViewRelation} import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, SubqueryExpression} -import org.apache.spark.sql.catalyst.plans.logical.{AnalysisOnlyCommand, LogicalPlan, Project, View, WithCTE, WithCTEInChildren} +import org.apache.spark.sql.catalyst.plans.logical.{AnalysisOnlyCommand, LogicalPlan, Project, View, WithCTE, CTEInChildren} import org.apache.spark.sql.catalyst.util.CharVarcharUtils import org.apache.spark.sql.connector.catalog.CatalogV2Implicits.NamespaceHelper import org.apache.spark.sql.errors.QueryCompilationErrors @@ -69,7 +69,7 @@ case class CreateViewCommand( viewType: ViewType, isAnalyzed: Boolean = false, referredTempFunctions: Seq[String] = Seq.empty) - extends RunnableCommand with AnalysisOnlyCommand with WithCTEInChildren { + extends RunnableCommand with AnalysisOnlyCommand with CTEInChildren { import ViewHelper._ @@ -239,7 +239,7 @@ case class AlterViewAsCommand( query: LogicalPlan, isAnalyzed: Boolean = false, referredTempFunctions: Seq[String] = Seq.empty) - extends RunnableCommand with AnalysisOnlyCommand with WithCTEInChildren { + extends RunnableCommand with AnalysisOnlyCommand with CTEInChildren { import ViewHelper._ diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoDataSourceCommand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoDataSourceCommand.scala index b0e5482eb945..dd5e2b169105 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoDataSourceCommand.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoDataSourceCommand.scala @@ -19,7 +19,7 @@ package org.apache.spark.sql.execution.datasources import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.plans.QueryPlan -import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, WithCTE, WithCTEInChildren} +import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, WithCTE, CTEInChildren} import org.apache.spark.sql.execution.command.LeafRunnableCommand import org.apache.spark.sql.sources.InsertableRelation @@ -31,7 +31,7 @@ case class InsertIntoDataSourceCommand( logicalRelation: LogicalRelation, query: LogicalPlan, overwrite: Boolean) - extends LeafRunnableCommand with WithCTEInChildren { + extends LeafRunnableCommand with CTEInChildren { override def innerChildren: Seq[QueryPlan[_]] = Seq(query) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala index fd2d52f1ac0b..71453f1a2730 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala @@ -25,7 +25,7 @@ import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogTable, CatalogT import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec import org.apache.spark.sql.catalyst.catalog.ExternalCatalogUtils._ import org.apache.spark.sql.catalyst.expressions.{Attribute, SortOrder} -import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, WithCTE, WithCTEInChildren} +import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, WithCTE, CTEInChildren} import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap import org.apache.spark.sql.errors.{QueryCompilationErrors, QueryExecutionErrors} import org.apache.spark.sql.execution.SparkPlan @@ -57,7 +57,7 @@ case class InsertIntoHadoopFsRelationCommand( catalogTable: Option[CatalogTable], fileIndex: Option[FileIndex], outputColumnNames: Seq[String]) - extends V1WriteCommand with WithCTEInChildren { + extends V1WriteCommand with CTEInChildren { private lazy val parameters = CaseInsensitiveMap(options) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SaveIntoDataSourceCommand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SaveIntoDataSourceCommand.scala index 81ba84f3713f..8547364f1ead 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SaveIntoDataSourceCommand.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SaveIntoDataSourceCommand.scala @@ -21,7 +21,7 @@ import scala.util.control.NonFatal import org.apache.spark.sql.{Dataset, Row, SaveMode, SparkSession} import org.apache.spark.sql.catalyst.plans.QueryPlan -import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, WithCTE, WithCTEInChildren} +import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, WithCTE, CTEInChildren} import org.apache.spark.sql.catalyst.types.DataTypeUtils.toAttributes import org.apache.spark.sql.execution.command.LeafRunnableCommand import org.apache.spark.sql.sources.CreatableRelationProvider @@ -39,7 +39,7 @@ case class SaveIntoDataSourceCommand( query: LogicalPlan, dataSource: CreatableRelationProvider, options: Map[String, String], - mode: SaveMode) extends LeafRunnableCommand with WithCTEInChildren { + mode: SaveMode) extends LeafRunnableCommand with CTEInChildren { override def innerChildren: Seq[QueryPlan[_]] = Seq(query) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala index 64de738a3b64..ba09a036f088 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala @@ -21,7 +21,7 @@ import scala.util.control.NonFatal import org.apache.spark.sql.{Row, SaveMode, SparkSession} import org.apache.spark.sql.catalyst.catalog.CatalogTable -import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, WithCTE, WithCTEInChildren} +import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, WithCTE, CTEInChildren} import org.apache.spark.sql.catalyst.util.CharVarcharUtils import org.apache.spark.sql.errors.QueryCompilationErrors import org.apache.spark.sql.execution.command.{DataWritingCommand, LeafRunnableCommand} @@ -38,7 +38,7 @@ case class CreateHiveTableAsSelectCommand( query: LogicalPlan, outputColumnNames: Seq[String], mode: SaveMode) - extends LeafRunnableCommand with WithCTEInChildren { + extends LeafRunnableCommand with CTEInChildren { assert(query.resolved) override def innerChildren: Seq[LogicalPlan] = query :: Nil From a6633e46f55add50455cd91f186a2eb6aeecbcf5 Mon Sep 17 00:00:00 2001 From: Max Gekk Date: Sun, 16 Jul 2023 14:05:52 +0300 Subject: [PATCH 4/9] Remove isCommand --- .../catalyst/analysis/CTESubstitution.scala | 24 ++++++++----------- 1 file changed, 10 insertions(+), 14 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CTESubstitution.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CTESubstitution.scala index 84c9c55bbc9a..9424d67e725a 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CTESubstitution.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CTESubstitution.scala @@ -52,17 +52,16 @@ object CTESubstitution extends Rule[LogicalPlan] { if (!plan.containsPattern(UNRESOLVED_WITH)) { return plan } - val isCommand = false val cteDefs = ArrayBuffer.empty[CTERelationDef] val (substituted, firstSubstituted) = LegacyBehaviorPolicy.withName(conf.getConf(LEGACY_CTE_PRECEDENCE_POLICY)) match { case LegacyBehaviorPolicy.EXCEPTION => assertNoNameConflictsInCTE(plan) - traverseAndSubstituteCTE(plan, isCommand, Seq.empty, cteDefs) + traverseAndSubstituteCTE(plan, Seq.empty, cteDefs) case LegacyBehaviorPolicy.LEGACY => (legacyTraverseAndSubstituteCTE(plan, cteDefs), None) case LegacyBehaviorPolicy.CORRECTED => - traverseAndSubstituteCTE(plan, isCommand, Seq.empty, cteDefs) + traverseAndSubstituteCTE(plan, Seq.empty, cteDefs) } if (cteDefs.isEmpty) { substituted @@ -128,7 +127,7 @@ object CTESubstitution extends Rule[LogicalPlan] { plan.resolveOperatorsUp { case UnresolvedWith(child, relations) => val resolvedCTERelations = - resolveCTERelations(relations, isLegacy = true, isCommand = false, Seq.empty, cteDefs) + resolveCTERelations(relations, isLegacy = true, Seq.empty, cteDefs) substituteCTE(child, alwaysInline = true, resolvedCTERelations) } } @@ -165,7 +164,6 @@ object CTESubstitution extends Rule[LogicalPlan] { * SELECT * FROM t * ) * @param plan the plan to be traversed - * @param isCommand if this is a command * @param outerCTEDefs already resolved outer CTE definitions with names * @param cteDefs all accumulated CTE definitions * @return the plan where CTE substitution is applied and optionally the last substituted `With` @@ -173,7 +171,6 @@ object CTESubstitution extends Rule[LogicalPlan] { */ private def traverseAndSubstituteCTE( plan: LogicalPlan, - isCommand: Boolean, outerCTEDefs: Seq[(String, CTERelationDef)], cteDefs: ArrayBuffer[CTERelationDef]): (LogicalPlan, Option[LogicalPlan]) = { var firstSubstituted: Option[LogicalPlan] = None @@ -181,11 +178,11 @@ object CTESubstitution extends Rule[LogicalPlan] { _.containsAnyPattern(UNRESOLVED_WITH, PLAN_EXPRESSION)) { case UnresolvedWith(child: LogicalPlan, relations) => val resolvedCTERelations = - resolveCTERelations(relations, isLegacy = false, isCommand, outerCTEDefs, cteDefs) ++ + resolveCTERelations(relations, isLegacy = false, outerCTEDefs, cteDefs) ++ outerCTEDefs val substituted = substituteCTE( - traverseAndSubstituteCTE(child, isCommand, resolvedCTERelations, cteDefs)._1, - isCommand, + traverseAndSubstituteCTE(child, resolvedCTERelations, cteDefs)._1, + false, resolvedCTERelations) if (firstSubstituted.isEmpty) { firstSubstituted = Some(substituted) @@ -203,10 +200,9 @@ object CTESubstitution extends Rule[LogicalPlan] { private def resolveCTERelations( relations: Seq[(String, SubqueryAlias)], isLegacy: Boolean, - isCommand: Boolean, outerCTEDefs: Seq[(String, CTERelationDef)], cteDefs: ArrayBuffer[CTERelationDef]): Seq[(String, CTERelationDef)] = { - var resolvedCTERelations = if (isLegacy || isCommand) { + var resolvedCTERelations = if (isLegacy) { Seq.empty } else { outerCTEDefs @@ -229,12 +225,12 @@ object CTESubstitution extends Rule[LogicalPlan] { // WITH t3 AS (SELECT * FROM t1) // ) // t3 should resolve the t1 to `SELECT 2` instead of `SELECT 1`. - traverseAndSubstituteCTE(relation, isCommand, resolvedCTERelations, cteDefs)._1 + traverseAndSubstituteCTE(relation, resolvedCTERelations, cteDefs)._1 } // CTE definition can reference a previous one - val substituted = substituteCTE(innerCTEResolved, isLegacy || isCommand, resolvedCTERelations) + val substituted = substituteCTE(innerCTEResolved, isLegacy, resolvedCTERelations) val cteRelation = CTERelationDef(substituted) - if (!(isLegacy || isCommand)) { + if (!(isLegacy)) { cteDefs += cteRelation } // Prepending new CTEs makes sure that those have higher priority over outer ones. From 25230f2b7536a3e11465b9f8926271e615a6456b Mon Sep 17 00:00:00 2001 From: Max Gekk Date: Sun, 16 Jul 2023 14:58:49 +0300 Subject: [PATCH 5/9] Add withCTEDefs() --- .../catalyst/analysis/CTESubstitution.scala | 33 +++++++++++++++---- 1 file changed, 26 insertions(+), 7 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CTESubstitution.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CTESubstitution.scala index 9424d67e725a..09bf49d39360 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CTESubstitution.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CTESubstitution.scala @@ -20,7 +20,7 @@ package org.apache.spark.sql.catalyst.analysis import scala.collection.mutable.ArrayBuffer import org.apache.spark.sql.catalyst.expressions.SubqueryExpression -import org.apache.spark.sql.catalyst.plans.logical.{CTERelationDef, CTERelationRef, LogicalPlan, SubqueryAlias, UnresolvedWith, WithCTE} +import org.apache.spark.sql.catalyst.plans.logical.{CTEInChildren, CTERelationDef, CTERelationRef, LogicalPlan, SubqueryAlias, UnresolvedWith, WithCTE} import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.catalyst.trees.TreePattern._ import org.apache.spark.sql.catalyst.util.TypeUtils._ @@ -30,8 +30,7 @@ import org.apache.spark.sql.internal.SQLConf.{LEGACY_CTE_PRECEDENCE_POLICY, Lega /** * Analyze WITH nodes and substitute child plan with CTE references or CTE definitions depending * on the conditions below: - * 1. If in legacy mode, or if the query is a SQL command or DML statement, replace with CTE - * definitions, i.e., inline CTEs. + * 1. If in legacy mode, replace with CTE definitions, i.e., inline CTEs. * 2. Otherwise, replace with CTE references `CTERelationRef`s. The decision to inline or not * inline will be made later by the rule `InlineCTE` after query analysis. * @@ -46,6 +45,9 @@ import org.apache.spark.sql.internal.SQLConf.{LEGACY_CTE_PRECEDENCE_POLICY, Lega * dependency for any valid CTE query (i.e., given CTE definitions A and B with B referencing A, * A is guaranteed to appear before B). Otherwise, it must be an invalid user query, and an * analysis exception will be thrown later by relation resolving rules. + * + * If the query is a SQL command or DML statement (extends `CTEInChildren`), + * place `WithCTE` into their children. */ object CTESubstitution extends Rule[LogicalPlan] { def apply(plan: LogicalPlan): LogicalPlan = { @@ -66,18 +68,18 @@ object CTESubstitution extends Rule[LogicalPlan] { if (cteDefs.isEmpty) { substituted } else if (substituted eq firstSubstituted.get) { - WithCTE(substituted, cteDefs.toSeq) + withCTEDefs(substituted, cteDefs.toSeq) } else { var done = false substituted.resolveOperatorsWithPruning(_ => !done) { case p if p eq firstSubstituted.get => // `firstSubstituted` is the parent of all other CTEs (if any). done = true - WithCTE(p, cteDefs.toSeq) + withCTEDefs(p, cteDefs.toSeq) case p if p.children.count(_.containsPattern(CTE)) > 1 => // This is the first common parent of all CTEs. done = true - WithCTE(p, cteDefs.toSeq) + withCTEDefs(p, cteDefs.toSeq) } } } @@ -242,7 +244,7 @@ object CTESubstitution extends Rule[LogicalPlan] { private def substituteCTE( plan: LogicalPlan, alwaysInline: Boolean, - cteRelations: Seq[(String, CTERelationDef)]): LogicalPlan = + cteRelations: Seq[(String, CTERelationDef)]): LogicalPlan = { plan.resolveOperatorsUpWithPruning( _.containsAnyPattern(RELATION_TIME_TRAVEL, UNRESOLVED_RELATION, PLAN_EXPRESSION)) { case RelationTimeTravel(UnresolvedRelation(Seq(table), _, _), _, _) @@ -266,4 +268,21 @@ object CTESubstitution extends Rule[LogicalPlan] { e.withNewPlan(apply(substituteCTE(e.plan, alwaysInline, cteRelations))) } } + } + + /** + * Finds all logical nodes that should have `WithCTE` in their children like + * `InsertIntoStatement`, put `WithCTE` on top of the children and don't place `WithCTE` + * on top of the plan. If there are no such nodes, put `WithCTE` on the top. + */ + private def withCTEDefs(p: LogicalPlan, cteDefs: Seq[CTERelationDef]): LogicalPlan = { + val withCTE = WithCTE(p, cteDefs) + var onTop = true + val newPlan = p.resolveOperatorsDown { + case cteInChildren: CTEInChildren => + onTop = false + cteInChildren.withCTE(withCTE) + } + if (onTop) withCTE else WithCTE(newPlan, cteDefs) + } } From 58bc2612bc768b7e36c45153dcb9709dc4206d71 Mon Sep 17 00:00:00 2001 From: Max Gekk Date: Sun, 16 Jul 2023 22:04:12 +0500 Subject: [PATCH 6/9] Fix coding style issues --- .../spark/sql/execution/command/DataWritingCommand.scala | 2 +- .../execution/command/InsertIntoDataSourceDirCommand.scala | 2 +- .../spark/sql/execution/command/createDataSourceTables.scala | 4 ++-- .../scala/org/apache/spark/sql/execution/command/views.scala | 2 +- .../execution/datasources/InsertIntoDataSourceCommand.scala | 2 +- .../datasources/InsertIntoHadoopFsRelationCommand.scala | 2 +- .../sql/execution/datasources/SaveIntoDataSourceCommand.scala | 2 +- .../sql/hive/execution/CreateHiveTableAsSelectCommand.scala | 2 +- 8 files changed, 9 insertions(+), 9 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/DataWritingCommand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/DataWritingCommand.scala index 2fbca82e82ee..592ae04a055d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/DataWritingCommand.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/DataWritingCommand.scala @@ -24,7 +24,7 @@ import org.apache.hadoop.conf.Configuration import org.apache.spark.SparkContext import org.apache.spark.sql.{Row, SaveMode, SparkSession} import org.apache.spark.sql.catalyst.expressions.Attribute -import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, UnaryCommand, CTEInChildren} +import org.apache.spark.sql.catalyst.plans.logical.{CTEInChildren, LogicalPlan, UnaryCommand} import org.apache.spark.sql.errors.QueryCompilationErrors import org.apache.spark.sql.execution.{SparkPlan, SQLExecution} import org.apache.spark.sql.execution.datasources.BasicWriteJobStatsTracker diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/InsertIntoDataSourceDirCommand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/InsertIntoDataSourceDirCommand.scala index 495e5c9c1838..0a9064261c7a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/InsertIntoDataSourceDirCommand.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/InsertIntoDataSourceDirCommand.scala @@ -19,7 +19,7 @@ package org.apache.spark.sql.execution.command import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.catalog._ -import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, WithCTE, CTEInChildren} +import org.apache.spark.sql.catalyst.plans.logical.{CTEInChildren, LogicalPlan, WithCTE} import org.apache.spark.sql.errors.QueryExecutionErrors import org.apache.spark.sql.execution.datasources._ diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala index d897e8644ecd..b1b2fd53c74a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala @@ -21,8 +21,8 @@ import java.net.URI import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.catalog._ -import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, WithCTE, CTEInChildren} -import org.apache.spark.sql.catalyst.util.{CharVarcharUtils, removeInternalMetadata} +import org.apache.spark.sql.catalyst.plans.logical.{CTEInChildren, LogicalPlan, WithCTE} +import org.apache.spark.sql.catalyst.util.{removeInternalMetadata, CharVarcharUtils} import org.apache.spark.sql.errors.QueryCompilationErrors import org.apache.spark.sql.execution.CommandExecutionMode import org.apache.spark.sql.execution.datasources._ diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala index 4636333c8212..8a12b162f994 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala @@ -28,7 +28,7 @@ import org.apache.spark.sql.catalyst.{SQLConfHelper, TableIdentifier} import org.apache.spark.sql.catalyst.analysis.{AnalysisContext, GlobalTempView, LocalTempView, ViewType} import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, CatalogTable, CatalogTableType, TemporaryViewRelation} import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, SubqueryExpression} -import org.apache.spark.sql.catalyst.plans.logical.{AnalysisOnlyCommand, LogicalPlan, Project, View, WithCTE, CTEInChildren} +import org.apache.spark.sql.catalyst.plans.logical.{AnalysisOnlyCommand, CTEInChildren, LogicalPlan, Project, View, WithCTE} import org.apache.spark.sql.catalyst.util.CharVarcharUtils import org.apache.spark.sql.connector.catalog.CatalogV2Implicits.NamespaceHelper import org.apache.spark.sql.errors.QueryCompilationErrors diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoDataSourceCommand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoDataSourceCommand.scala index dd5e2b169105..7cffd6efdb70 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoDataSourceCommand.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoDataSourceCommand.scala @@ -19,7 +19,7 @@ package org.apache.spark.sql.execution.datasources import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.plans.QueryPlan -import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, WithCTE, CTEInChildren} +import org.apache.spark.sql.catalyst.plans.logical.{CTEInChildren, LogicalPlan, WithCTE} import org.apache.spark.sql.execution.command.LeafRunnableCommand import org.apache.spark.sql.sources.InsertableRelation diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala index 71453f1a2730..1c98854b81cb 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala @@ -25,7 +25,7 @@ import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogTable, CatalogT import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec import org.apache.spark.sql.catalyst.catalog.ExternalCatalogUtils._ import org.apache.spark.sql.catalyst.expressions.{Attribute, SortOrder} -import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, WithCTE, CTEInChildren} +import org.apache.spark.sql.catalyst.plans.logical.{CTEInChildren, LogicalPlan, WithCTE} import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap import org.apache.spark.sql.errors.{QueryCompilationErrors, QueryExecutionErrors} import org.apache.spark.sql.execution.SparkPlan diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SaveIntoDataSourceCommand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SaveIntoDataSourceCommand.scala index 8547364f1ead..2d76e7c3afa4 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SaveIntoDataSourceCommand.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SaveIntoDataSourceCommand.scala @@ -21,7 +21,7 @@ import scala.util.control.NonFatal import org.apache.spark.sql.{Dataset, Row, SaveMode, SparkSession} import org.apache.spark.sql.catalyst.plans.QueryPlan -import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, WithCTE, CTEInChildren} +import org.apache.spark.sql.catalyst.plans.logical.{CTEInChildren, LogicalPlan, WithCTE} import org.apache.spark.sql.catalyst.types.DataTypeUtils.toAttributes import org.apache.spark.sql.execution.command.LeafRunnableCommand import org.apache.spark.sql.sources.CreatableRelationProvider diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala index ba09a036f088..5bf04460f522 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala @@ -21,7 +21,7 @@ import scala.util.control.NonFatal import org.apache.spark.sql.{Row, SaveMode, SparkSession} import org.apache.spark.sql.catalyst.catalog.CatalogTable -import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, WithCTE, CTEInChildren} +import org.apache.spark.sql.catalyst.plans.logical.{CTEInChildren, LogicalPlan, WithCTE} import org.apache.spark.sql.catalyst.util.CharVarcharUtils import org.apache.spark.sql.errors.QueryCompilationErrors import org.apache.spark.sql.execution.command.{DataWritingCommand, LeafRunnableCommand} From fedc9a61530c8f4e000b87d0c74f92213a0f6954 Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Mon, 17 Jul 2023 15:35:59 +0800 Subject: [PATCH 7/9] fix --- .../catalyst/analysis/CTESubstitution.scala | 71 +++++--- .../plans/logical/basicLogicalOperators.scala | 19 +-- .../catalyst/plans/logical/v2Commands.scala | 20 +-- .../apache/spark/sql/internal/SQLConf.scala | 8 + .../InsertIntoDataSourceDirCommand.scala | 6 +- .../command/createDataSourceTables.scala | 6 +- .../spark/sql/execution/command/tables.scala | 4 +- .../spark/sql/execution/command/views.scala | 10 +- .../InsertIntoDataSourceCommand.scala | 6 +- .../InsertIntoHadoopFsRelationCommand.scala | 8 +- .../SaveIntoDataSourceCommand.scala | 6 +- .../double-quoted-identifiers-enabled.sql.out | 14 +- .../analyzer-results/cte-command.sql.out | 152 ++++++++++++++++++ .../analyzer-results/postgreSQL/with.sql.out | 12 +- .../sql-tests/inputs/cte-command.sql | 33 ++++ .../sql-tests/results/cte-command.sql.out | 121 ++++++++++++++ .../CreateHiveTableAsSelectCommand.scala | 6 +- 17 files changed, 422 insertions(+), 80 deletions(-) create mode 100644 sql/core/src/test/resources/sql-tests/analyzer-results/cte-command.sql.out create mode 100644 sql/core/src/test/resources/sql-tests/inputs/cte-command.sql create mode 100644 sql/core/src/test/resources/sql-tests/results/cte-command.sql.out diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CTESubstitution.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CTESubstitution.scala index 09bf49d39360..6da3e1bb88de 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CTESubstitution.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CTESubstitution.scala @@ -20,11 +20,12 @@ package org.apache.spark.sql.catalyst.analysis import scala.collection.mutable.ArrayBuffer import org.apache.spark.sql.catalyst.expressions.SubqueryExpression -import org.apache.spark.sql.catalyst.plans.logical.{CTEInChildren, CTERelationDef, CTERelationRef, LogicalPlan, SubqueryAlias, UnresolvedWith, WithCTE} +import org.apache.spark.sql.catalyst.plans.logical.{Command, CTEInChildren, CTERelationDef, CTERelationRef, InsertIntoDir, LogicalPlan, ParsedStatement, SubqueryAlias, UnresolvedWith, WithCTE} import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.catalyst.trees.TreePattern._ import org.apache.spark.sql.catalyst.util.TypeUtils._ import org.apache.spark.sql.errors.QueryCompilationErrors +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.internal.SQLConf.{LEGACY_CTE_PRECEDENCE_POLICY, LegacyBehaviorPolicy} /** @@ -54,16 +55,40 @@ object CTESubstitution extends Rule[LogicalPlan] { if (!plan.containsPattern(UNRESOLVED_WITH)) { return plan } + + val forceInline = if (conf.getConf(SQLConf.LEGACY_INLINE_CTE_IN_COMMANDS)) { + // The legacy behavior always inlines the CTE relations for queries in commands. + plan.exists { + case _: Command | _: ParsedStatement | _: InsertIntoDir => true + case _ => false + } + } else { + val commands = plan.collect { + case c @ (_: Command | _: ParsedStatement | _: InsertIntoDir) => c + } + if (commands.length == 1) { + // If there is only one command and it's `CTEInChildren`, we can resolve + // CTE normally and don't need to force inline. + !commands.head.isInstanceOf[CTEInChildren] + } else if (commands.length > 1) { + // This can happen with the multi-insert statement. We should fall back to + // the legacy behavior. + true + } else { + false + } + } + val cteDefs = ArrayBuffer.empty[CTERelationDef] val (substituted, firstSubstituted) = LegacyBehaviorPolicy.withName(conf.getConf(LEGACY_CTE_PRECEDENCE_POLICY)) match { case LegacyBehaviorPolicy.EXCEPTION => assertNoNameConflictsInCTE(plan) - traverseAndSubstituteCTE(plan, Seq.empty, cteDefs) + traverseAndSubstituteCTE(plan, forceInline, Seq.empty, cteDefs) case LegacyBehaviorPolicy.LEGACY => (legacyTraverseAndSubstituteCTE(plan, cteDefs), None) case LegacyBehaviorPolicy.CORRECTED => - traverseAndSubstituteCTE(plan, Seq.empty, cteDefs) + traverseAndSubstituteCTE(plan, forceInline, Seq.empty, cteDefs) } if (cteDefs.isEmpty) { substituted @@ -129,7 +154,7 @@ object CTESubstitution extends Rule[LogicalPlan] { plan.resolveOperatorsUp { case UnresolvedWith(child, relations) => val resolvedCTERelations = - resolveCTERelations(relations, isLegacy = true, Seq.empty, cteDefs) + resolveCTERelations(relations, isLegacy = true, forceInline = false, Seq.empty, cteDefs) substituteCTE(child, alwaysInline = true, resolvedCTERelations) } } @@ -166,6 +191,7 @@ object CTESubstitution extends Rule[LogicalPlan] { * SELECT * FROM t * ) * @param plan the plan to be traversed + * @param forceInline always inline the CTE relations if this is true * @param outerCTEDefs already resolved outer CTE definitions with names * @param cteDefs all accumulated CTE definitions * @return the plan where CTE substitution is applied and optionally the last substituted `With` @@ -173,6 +199,7 @@ object CTESubstitution extends Rule[LogicalPlan] { */ private def traverseAndSubstituteCTE( plan: LogicalPlan, + forceInline: Boolean, outerCTEDefs: Seq[(String, CTERelationDef)], cteDefs: ArrayBuffer[CTERelationDef]): (LogicalPlan, Option[LogicalPlan]) = { var firstSubstituted: Option[LogicalPlan] = None @@ -180,11 +207,11 @@ object CTESubstitution extends Rule[LogicalPlan] { _.containsAnyPattern(UNRESOLVED_WITH, PLAN_EXPRESSION)) { case UnresolvedWith(child: LogicalPlan, relations) => val resolvedCTERelations = - resolveCTERelations(relations, isLegacy = false, outerCTEDefs, cteDefs) ++ + resolveCTERelations(relations, isLegacy = false, forceInline, outerCTEDefs, cteDefs) ++ outerCTEDefs val substituted = substituteCTE( - traverseAndSubstituteCTE(child, resolvedCTERelations, cteDefs)._1, - false, + traverseAndSubstituteCTE(child, forceInline, resolvedCTERelations, cteDefs)._1, + forceInline, resolvedCTERelations) if (firstSubstituted.isEmpty) { firstSubstituted = Some(substituted) @@ -202,9 +229,11 @@ object CTESubstitution extends Rule[LogicalPlan] { private def resolveCTERelations( relations: Seq[(String, SubqueryAlias)], isLegacy: Boolean, + forceInline: Boolean, outerCTEDefs: Seq[(String, CTERelationDef)], cteDefs: ArrayBuffer[CTERelationDef]): Seq[(String, CTERelationDef)] = { - var resolvedCTERelations = if (isLegacy) { + val alwaysInline = isLegacy || forceInline + var resolvedCTERelations = if (alwaysInline) { Seq.empty } else { outerCTEDefs @@ -227,12 +256,12 @@ object CTESubstitution extends Rule[LogicalPlan] { // WITH t3 AS (SELECT * FROM t1) // ) // t3 should resolve the t1 to `SELECT 2` instead of `SELECT 1`. - traverseAndSubstituteCTE(relation, resolvedCTERelations, cteDefs)._1 + traverseAndSubstituteCTE(relation, forceInline, resolvedCTERelations, cteDefs)._1 } // CTE definition can reference a previous one - val substituted = substituteCTE(innerCTEResolved, isLegacy, resolvedCTERelations) + val substituted = substituteCTE(innerCTEResolved, alwaysInline, resolvedCTERelations) val cteRelation = CTERelationDef(substituted) - if (!(isLegacy)) { + if (!alwaysInline) { cteDefs += cteRelation } // Prepending new CTEs makes sure that those have higher priority over outer ones. @@ -271,18 +300,18 @@ object CTESubstitution extends Rule[LogicalPlan] { } /** - * Finds all logical nodes that should have `WithCTE` in their children like - * `InsertIntoStatement`, put `WithCTE` on top of the children and don't place `WithCTE` - * on top of the plan. If there are no such nodes, put `WithCTE` on the top. + * For commands which extend `CTEInChildren`, we should place the `WithCTE` node on its + * children. There are two reasons: + * 1. Some rules will pattern match the root command nodes, and we should keep command + * as the root node to not break them. + * 2. `Dataset` eagerly executes the commands inside a query plan. However, the CTE + * references inside commands will be invalid if we execute the command alone, as + * the CTE definitions are outside of the command. */ private def withCTEDefs(p: LogicalPlan, cteDefs: Seq[CTERelationDef]): LogicalPlan = { - val withCTE = WithCTE(p, cteDefs) - var onTop = true - val newPlan = p.resolveOperatorsDown { - case cteInChildren: CTEInChildren => - onTop = false - cteInChildren.withCTE(withCTE) + p match { + case c: CTEInChildren => c.withCTEDefs(cteDefs) + case _ => WithCTE(p, cteDefs) } - if (onTop) withCTE else WithCTE(newPlan, cteDefs) } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala index 4cf09a9a734a..7258d985db84 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala @@ -887,6 +887,16 @@ case class WithCTE(plan: LogicalPlan, cteDefs: Seq[CTERelationDef]) extends Logi } } +/** + * The logical node which is able to place the `WithCTE` node on its children. + */ +trait CTEInChildren extends LogicalPlan { + def withCTEDefs(cteDefs: Seq[CTERelationDef]): LogicalPlan = { + withNewChildren(children.map(WithCTE(_, cteDefs))) + } +} + + case class WithWindowDefinition( windowDefinitions: Map[String, WindowSpecDefinition], child: LogicalPlan) extends UnaryNode { @@ -896,15 +906,6 @@ case class WithWindowDefinition( copy(child = newChild) } -/** - * The logical node is able to insert the given `WithCTE` into its children. - */ -trait CTEInChildren extends LogicalPlan { - def withCTE(withCTE: WithCTE): LogicalPlan = { - withNewChildren(children.map(withCTE.withNewPlan)) - } -} - /** * @param order The ordering expressions * @param global True means global sorting apply for entire data set, diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala index 0f31f0068195..5c83da1a96aa 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala @@ -393,15 +393,13 @@ case class WriteDelta( } trait V2CreateTableAsSelectPlan - extends V2CreateTablePlan + extends V2CreateTablePlan with AnalysisOnlyCommand with CTEInChildren { def query: LogicalPlan - override def withCTE(withCTE: WithCTE): LogicalPlan = { - withNameAndQuery( - newName = this.name, - newQuery = withCTE.copy(plan = this.query)) + override def withCTEDefs(cteDefs: Seq[CTERelationDef]): LogicalPlan = { + withNameAndQuery(newName = name, newQuery = WithCTE(query, cteDefs)) } override lazy val resolved: Boolean = childrenResolved && { @@ -1250,10 +1248,8 @@ case class AlterViewAs( newLeft: LogicalPlan, newRight: LogicalPlan): LogicalPlan = copy(child = newLeft, query = newRight) - override def withCTE(withCTE: WithCTE): LogicalPlan = { - withNewChildrenInternal( - newLeft = this.left, - newRight = withCTE.copy(plan = this.right)) + override def withCTEDefs(cteDefs: Seq[CTERelationDef]): LogicalPlan = { + withNewChildren(Seq(child, WithCTE(query, cteDefs))) } } @@ -1275,10 +1271,8 @@ case class CreateView( newLeft: LogicalPlan, newRight: LogicalPlan): LogicalPlan = copy(child = newLeft, query = newRight) - override def withCTE(withCTE: WithCTE): LogicalPlan = { - withNewChildrenInternal( - newLeft = this.left, - newRight = withCTE.copy(plan = this.right)) + override def withCTEDefs(cteDefs: Seq[CTERelationDef]): LogicalPlan = { + withNewChildren(Seq(child, WithCTE(query, cteDefs))) } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 9093b0745a6c..e231884033b0 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -3759,6 +3759,14 @@ object SQLConf { .checkValues(LegacyBehaviorPolicy.values.map(_.toString)) .createWithDefault(LegacyBehaviorPolicy.EXCEPTION.toString) + val LEGACY_INLINE_CTE_IN_COMMANDS = buildConf("spark.sql.legacy.inlineCTEInCommands") + .internal() + .doc("If true, always inline the CTE relations for the queries in commands. This is the " + + "legacy behavior which may produce incorrect results because Spark may evaluate a CTE " + + "relation more than once, even if it's nondeterministic.") + .booleanConf + .createWithDefault(false) + val LEGACY_TIME_PARSER_POLICY = buildConf("spark.sql.legacy.timeParserPolicy") .internal() .doc("When LEGACY, java.text.SimpleDateFormat is used for formatting and parsing " + diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/InsertIntoDataSourceDirCommand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/InsertIntoDataSourceDirCommand.scala index 0a9064261c7a..67d38b28c83e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/InsertIntoDataSourceDirCommand.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/InsertIntoDataSourceDirCommand.scala @@ -19,7 +19,7 @@ package org.apache.spark.sql.execution.command import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.catalog._ -import org.apache.spark.sql.catalyst.plans.logical.{CTEInChildren, LogicalPlan, WithCTE} +import org.apache.spark.sql.catalyst.plans.logical.{CTEInChildren, CTERelationDef, LogicalPlan, WithCTE} import org.apache.spark.sql.errors.QueryExecutionErrors import org.apache.spark.sql.execution.datasources._ @@ -77,7 +77,7 @@ case class InsertIntoDataSourceDirCommand( Seq.empty[Row] } - override def withCTE(withCTE: WithCTE): LogicalPlan = { - copy(query = withCTE.copy(plan = this.query)) + override def withCTEDefs(cteDefs: Seq[CTERelationDef]): LogicalPlan = { + copy(query = WithCTE(query, cteDefs)) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala index b1b2fd53c74a..54e8181c56cb 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala @@ -21,7 +21,7 @@ import java.net.URI import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.catalog._ -import org.apache.spark.sql.catalyst.plans.logical.{CTEInChildren, LogicalPlan, WithCTE} +import org.apache.spark.sql.catalyst.plans.logical.{CTEInChildren, CTERelationDef, LogicalPlan, WithCTE} import org.apache.spark.sql.catalyst.util.{removeInternalMetadata, CharVarcharUtils} import org.apache.spark.sql.errors.QueryCompilationErrors import org.apache.spark.sql.execution.CommandExecutionMode @@ -234,7 +234,7 @@ case class CreateDataSourceTableAsSelectCommand( } } - override def withCTE(withCTE: WithCTE): LogicalPlan = { - copy(query = withCTE.copy(plan = this.query)) + override def withCTEDefs(cteDefs: Seq[CTERelationDef]): LogicalPlan = { + copy(query = WithCTE(query, cteDefs)) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala index 30fcf6ccdaf5..88e940ffdc78 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala @@ -748,8 +748,8 @@ case class DescribeQueryCommand(queryText: String, plan: LogicalPlan) result.toSeq } - override def withCTE(withCTE: WithCTE): LogicalPlan = { - copy(plan = withCTE.copy(plan = this.plan)) + override def withCTEDefs(cteDefs: Seq[CTERelationDef]): LogicalPlan = { + copy(plan = WithCTE(plan, cteDefs)) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala index 8a12b162f994..8ac982b9bdd9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala @@ -28,7 +28,7 @@ import org.apache.spark.sql.catalyst.{SQLConfHelper, TableIdentifier} import org.apache.spark.sql.catalyst.analysis.{AnalysisContext, GlobalTempView, LocalTempView, ViewType} import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, CatalogTable, CatalogTableType, TemporaryViewRelation} import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, SubqueryExpression} -import org.apache.spark.sql.catalyst.plans.logical.{AnalysisOnlyCommand, CTEInChildren, LogicalPlan, Project, View, WithCTE} +import org.apache.spark.sql.catalyst.plans.logical.{AnalysisOnlyCommand, CTEInChildren, CTERelationDef, LogicalPlan, Project, View, WithCTE} import org.apache.spark.sql.catalyst.util.CharVarcharUtils import org.apache.spark.sql.connector.catalog.CatalogV2Implicits.NamespaceHelper import org.apache.spark.sql.errors.QueryCompilationErrors @@ -216,8 +216,8 @@ case class CreateViewCommand( ) } - override def withCTE(withCTE: WithCTE): LogicalPlan = { - copy(plan = withCTE.copy(plan = this.plan)) + override def withCTEDefs(cteDefs: Seq[CTERelationDef]): LogicalPlan = { + copy(plan = WithCTE(plan, cteDefs)) } } @@ -312,8 +312,8 @@ case class AlterViewAsCommand( session.sessionState.catalog.alterTable(updatedViewMeta) } - override def withCTE(withCTE: WithCTE): LogicalPlan = { - copy(query = withCTE.copy(plan = this.query)) + override def withCTEDefs(cteDefs: Seq[CTERelationDef]): LogicalPlan = { + copy(query = WithCTE(query, cteDefs)) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoDataSourceCommand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoDataSourceCommand.scala index 7cffd6efdb70..128f6acdeaa6 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoDataSourceCommand.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoDataSourceCommand.scala @@ -19,7 +19,7 @@ package org.apache.spark.sql.execution.datasources import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.plans.QueryPlan -import org.apache.spark.sql.catalyst.plans.logical.{CTEInChildren, LogicalPlan, WithCTE} +import org.apache.spark.sql.catalyst.plans.logical.{CTEInChildren, CTERelationDef, LogicalPlan, WithCTE} import org.apache.spark.sql.execution.command.LeafRunnableCommand import org.apache.spark.sql.sources.InsertableRelation @@ -48,7 +48,7 @@ case class InsertIntoDataSourceCommand( Seq.empty[Row] } - override def withCTE(withCTE: WithCTE): LogicalPlan = { - copy(query = withCTE.copy(plan = this.query)) + override def withCTEDefs(cteDefs: Seq[CTERelationDef]): LogicalPlan = { + copy(query = WithCTE(query, cteDefs)) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala index 1c98854b81cb..fe6ec094812e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala @@ -25,7 +25,7 @@ import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogTable, CatalogT import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec import org.apache.spark.sql.catalyst.catalog.ExternalCatalogUtils._ import org.apache.spark.sql.catalyst.expressions.{Attribute, SortOrder} -import org.apache.spark.sql.catalyst.plans.logical.{CTEInChildren, LogicalPlan, WithCTE} +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap import org.apache.spark.sql.errors.{QueryCompilationErrors, QueryExecutionErrors} import org.apache.spark.sql.execution.SparkPlan @@ -57,7 +57,7 @@ case class InsertIntoHadoopFsRelationCommand( catalogTable: Option[CatalogTable], fileIndex: Option[FileIndex], outputColumnNames: Seq[String]) - extends V1WriteCommand with CTEInChildren { + extends V1WriteCommand { private lazy val parameters = CaseInsensitiveMap(options) @@ -277,8 +277,4 @@ case class InsertIntoHadoopFsRelationCommand( override protected def withNewChildInternal( newChild: LogicalPlan): InsertIntoHadoopFsRelationCommand = copy(query = newChild) - - override def withCTE(withCTE: WithCTE): LogicalPlan = { - withNewChildInternal(withCTE.copy(plan = this.query)) - } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SaveIntoDataSourceCommand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SaveIntoDataSourceCommand.scala index 2d76e7c3afa4..5423232db429 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SaveIntoDataSourceCommand.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SaveIntoDataSourceCommand.scala @@ -21,7 +21,7 @@ import scala.util.control.NonFatal import org.apache.spark.sql.{Dataset, Row, SaveMode, SparkSession} import org.apache.spark.sql.catalyst.plans.QueryPlan -import org.apache.spark.sql.catalyst.plans.logical.{CTEInChildren, LogicalPlan, WithCTE} +import org.apache.spark.sql.catalyst.plans.logical.{CTEInChildren, CTERelationDef, LogicalPlan, WithCTE} import org.apache.spark.sql.catalyst.types.DataTypeUtils.toAttributes import org.apache.spark.sql.execution.command.LeafRunnableCommand import org.apache.spark.sql.sources.CreatableRelationProvider @@ -69,7 +69,7 @@ case class SaveIntoDataSourceCommand( SaveIntoDataSourceCommand(query.clone(), dataSource, options, mode) } - override def withCTE(withCTE: WithCTE): LogicalPlan = { - copy(query = withCTE.copy(plan = this.query)) + override def withCTEDefs(cteDefs: Seq[CTERelationDef]): LogicalPlan = { + copy(query = WithCTE(query, cteDefs)) } } diff --git a/sql/core/src/test/resources/sql-tests/analyzer-results/ansi/double-quoted-identifiers-enabled.sql.out b/sql/core/src/test/resources/sql-tests/analyzer-results/ansi/double-quoted-identifiers-enabled.sql.out index 2a6bcce99d19..a155bccd0919 100644 --- a/sql/core/src/test/resources/sql-tests/analyzer-results/ansi/double-quoted-identifiers-enabled.sql.out +++ b/sql/core/src/test/resources/sql-tests/analyzer-results/ansi/double-quoted-identifiers-enabled.sql.out @@ -410,11 +410,15 @@ CREATE TEMPORARY VIEW "myview"("c1") AS WITH "v"("a") AS (SELECT 1) SELECT "a" FROM "v" -- !query analysis CreateViewCommand `myview`, [(c1,None)], WITH "v"("a") AS (SELECT 1) SELECT "a" FROM "v", false, false, LocalTempView, true - +- Project [a#x] - +- SubqueryAlias v - +- Project [1#x AS a#x] - +- Project [1 AS 1#x] - +- OneRowRelation + +- WithCTE + :- CTERelationDef xxxx, false + : +- SubqueryAlias v + : +- Project [1#x AS a#x] + : +- Project [1 AS 1#x] + : +- OneRowRelation + +- Project [a#x] + +- SubqueryAlias v + +- CTERelationRef xxxx, true, [a#x] -- !query diff --git a/sql/core/src/test/resources/sql-tests/analyzer-results/cte-command.sql.out b/sql/core/src/test/resources/sql-tests/analyzer-results/cte-command.sql.out new file mode 100644 index 000000000000..93dab5aaa6df --- /dev/null +++ b/sql/core/src/test/resources/sql-tests/analyzer-results/cte-command.sql.out @@ -0,0 +1,152 @@ +-- Automatically generated by SQLQueryTestSuite +-- !query +CREATE TABLE cte_tbl USING csv AS WITH s AS (SELECT 42 AS col) SELECT * FROM s +-- !query analysis +CreateDataSourceTableAsSelectCommand `spark_catalog`.`default`.`cte_tbl`, ErrorIfExists, [col] + +- WithCTE + :- CTERelationDef xxxx, false + : +- SubqueryAlias s + : +- Project [42 AS col#x] + : +- OneRowRelation + +- Project [col#x] + +- SubqueryAlias s + +- CTERelationRef xxxx, true, [col#x] + + +-- !query +SELECT * FROM cte_tbl +-- !query analysis +Project [col#x] ++- SubqueryAlias spark_catalog.default.cte_tbl + +- Relation spark_catalog.default.cte_tbl[col#x] csv + + +-- !query +CREATE TEMPORARY VIEW cte_view AS WITH s AS (SELECT 42 AS col) SELECT * FROM s +-- !query analysis +CreateViewCommand `cte_view`, WITH s AS (SELECT 42 AS col) SELECT * FROM s, false, false, LocalTempView, true + +- WithCTE + :- CTERelationDef xxxx, false + : +- SubqueryAlias s + : +- Project [42 AS col#x] + : +- OneRowRelation + +- Project [col#x] + +- SubqueryAlias s + +- CTERelationRef xxxx, true, [col#x] + + +-- !query +SELECT * FROM cte_view +-- !query analysis +Project [col#x] ++- SubqueryAlias cte_view + +- View (`cte_view`, [col#x]) + +- Project [cast(col#x as int) AS col#x] + +- WithCTE + :- CTERelationDef xxxx, false + : +- SubqueryAlias s + : +- Project [42 AS col#x] + : +- OneRowRelation + +- Project [col#x] + +- SubqueryAlias s + +- CTERelationRef xxxx, true, [col#x] + + +-- !query +WITH s AS (SELECT 43 AS col) +INSERT INTO cte_tbl SELECT * FROM S +-- !query analysis +InsertIntoHadoopFsRelationCommand file:[not included in comparison]/{warehouse_dir}/cte_tbl, false, CSV, [path=file:[not included in comparison]/{warehouse_dir}/cte_tbl], Append, `spark_catalog`.`default`.`cte_tbl`, org.apache.spark.sql.execution.datasources.InMemoryFileIndex(file:[not included in comparison]/{warehouse_dir}/cte_tbl), [col] ++- WithCTE + :- CTERelationDef xxxx, false + : +- SubqueryAlias s + : +- Project [43 AS col#x] + : +- OneRowRelation + +- Project [col#x] + +- SubqueryAlias S + +- CTERelationRef xxxx, true, [col#x] + + +-- !query +SELECT * FROM cte_tbl +-- !query analysis +Project [col#x] ++- SubqueryAlias spark_catalog.default.cte_tbl + +- Relation spark_catalog.default.cte_tbl[col#x] csv + + +-- !query +INSERT INTO cte_tbl WITH s AS (SELECT 44 AS col) SELECT * FROM s +-- !query analysis +InsertIntoHadoopFsRelationCommand file:[not included in comparison]/{warehouse_dir}/cte_tbl, false, CSV, [path=file:[not included in comparison]/{warehouse_dir}/cte_tbl], Append, `spark_catalog`.`default`.`cte_tbl`, org.apache.spark.sql.execution.datasources.InMemoryFileIndex(file:[not included in comparison]/{warehouse_dir}/cte_tbl), [col] ++- WithCTE + :- CTERelationDef xxxx, false + : +- SubqueryAlias s + : +- Project [44 AS col#x] + : +- OneRowRelation + +- Project [col#x] + +- SubqueryAlias s + +- CTERelationRef xxxx, true, [col#x] + + +-- !query +SELECT * FROM cte_tbl +-- !query analysis +Project [col#x] ++- SubqueryAlias spark_catalog.default.cte_tbl + +- Relation spark_catalog.default.cte_tbl[col#x] csv + + +-- !query +CREATE TABLE cte_tbl2 (col INT) USING csv +-- !query analysis +CreateDataSourceTableCommand `spark_catalog`.`default`.`cte_tbl2`, false + + +-- !query +WITH s AS (SELECT 45 AS col) +FROM s +INSERT INTO cte_tbl SELECT col +INSERT INTO cte_tbl2 SELECT col +-- !query analysis +Union false, false +:- InsertIntoHadoopFsRelationCommand file:[not included in comparison]/{warehouse_dir}/cte_tbl, false, CSV, [path=file:[not included in comparison]/{warehouse_dir}/cte_tbl], Append, `spark_catalog`.`default`.`cte_tbl`, org.apache.spark.sql.execution.datasources.InMemoryFileIndex(file:[not included in comparison]/{warehouse_dir}/cte_tbl), [col] +: +- Project [col#x] +: +- SubqueryAlias s +: +- Project [45 AS col#x] +: +- OneRowRelation ++- InsertIntoHadoopFsRelationCommand file:[not included in comparison]/{warehouse_dir}/cte_tbl2, false, CSV, [path=file:[not included in comparison]/{warehouse_dir}/cte_tbl2], Append, `spark_catalog`.`default`.`cte_tbl2`, org.apache.spark.sql.execution.datasources.InMemoryFileIndex(file:[not included in comparison]/{warehouse_dir}/cte_tbl2), [col] + +- Project [col#x] + +- SubqueryAlias s + +- Project [45 AS col#x] + +- OneRowRelation + + +-- !query +SELECT * FROM cte_tbl +-- !query analysis +Project [col#x] ++- SubqueryAlias spark_catalog.default.cte_tbl + +- Relation spark_catalog.default.cte_tbl[col#x] csv + + +-- !query +SELECT * FROM cte_tbl2 +-- !query analysis +Project [col#x] ++- SubqueryAlias spark_catalog.default.cte_tbl2 + +- Relation spark_catalog.default.cte_tbl2[col#x] csv + + +-- !query +DROP TABLE cte_tbl +-- !query analysis +DropTable false, false ++- ResolvedIdentifier V2SessionCatalog(spark_catalog), default.cte_tbl + + +-- !query +DROP TABLE cte_tbl2 +-- !query analysis +DropTable false, false ++- ResolvedIdentifier V2SessionCatalog(spark_catalog), default.cte_tbl2 diff --git a/sql/core/src/test/resources/sql-tests/analyzer-results/postgreSQL/with.sql.out b/sql/core/src/test/resources/sql-tests/analyzer-results/postgreSQL/with.sql.out index e53480e96bed..f58f8faa0be3 100644 --- a/sql/core/src/test/resources/sql-tests/analyzer-results/postgreSQL/with.sql.out +++ b/sql/core/src/test/resources/sql-tests/analyzer-results/postgreSQL/with.sql.out @@ -452,10 +452,14 @@ with test as (select 42) insert into test select * from test -- !query analysis InsertIntoHadoopFsRelationCommand file:[not included in comparison]/{warehouse_dir}/test, false, Parquet, [path=file:[not included in comparison]/{warehouse_dir}/test], Append, `spark_catalog`.`default`.`test`, org.apache.spark.sql.execution.datasources.InMemoryFileIndex(file:[not included in comparison]/{warehouse_dir}/test), [i] +- Project [cast(42#x as int) AS i#x] - +- Project [42#x] - +- SubqueryAlias test - +- Project [42 AS 42#x] - +- OneRowRelation + +- WithCTE + :- CTERelationDef xxxx, false + : +- SubqueryAlias test + : +- Project [42 AS 42#x] + : +- OneRowRelation + +- Project [42#x] + +- SubqueryAlias test + +- CTERelationRef xxxx, true, [42#x] -- !query diff --git a/sql/core/src/test/resources/sql-tests/inputs/cte-command.sql b/sql/core/src/test/resources/sql-tests/inputs/cte-command.sql new file mode 100644 index 000000000000..ee90c2de49eb --- /dev/null +++ b/sql/core/src/test/resources/sql-tests/inputs/cte-command.sql @@ -0,0 +1,33 @@ +-- WITH inside CTE +CREATE TABLE cte_tbl USING csv AS WITH s AS (SELECT 42 AS col) SELECT * FROM s; + +SELECT * FROM cte_tbl; + +-- WITH inside CREATE VIEW +CREATE TEMPORARY VIEW cte_view AS WITH s AS (SELECT 42 AS col) SELECT * FROM s; + +SELECT * FROM cte_view; + +-- INSERT inside WITH +WITH s AS (SELECT 43 AS col) +INSERT INTO cte_tbl SELECT * FROM S; + +SELECT * FROM cte_tbl; + +-- WITH inside INSERT +INSERT INTO cte_tbl WITH s AS (SELECT 44 AS col) SELECT * FROM s; + +SELECT * FROM cte_tbl; + +CREATE TABLE cte_tbl2 (col INT) USING csv; +-- Multi-INSERT +WITH s AS (SELECT 45 AS col) +FROM s +INSERT INTO cte_tbl SELECT col +INSERT INTO cte_tbl2 SELECT col; + +SELECT * FROM cte_tbl; +SELECT * FROM cte_tbl2; + +DROP TABLE cte_tbl; +DROP TABLE cte_tbl2; diff --git a/sql/core/src/test/resources/sql-tests/results/cte-command.sql.out b/sql/core/src/test/resources/sql-tests/results/cte-command.sql.out new file mode 100644 index 000000000000..67ac321a1954 --- /dev/null +++ b/sql/core/src/test/resources/sql-tests/results/cte-command.sql.out @@ -0,0 +1,121 @@ +-- Automatically generated by SQLQueryTestSuite +-- !query +CREATE TABLE cte_tbl USING csv AS WITH s AS (SELECT 42 AS col) SELECT * FROM s +-- !query schema +struct<> +-- !query output + + + +-- !query +SELECT * FROM cte_tbl +-- !query schema +struct +-- !query output +42 + + +-- !query +CREATE TEMPORARY VIEW cte_view AS WITH s AS (SELECT 42 AS col) SELECT * FROM s +-- !query schema +struct<> +-- !query output + + + +-- !query +SELECT * FROM cte_view +-- !query schema +struct +-- !query output +42 + + +-- !query +WITH s AS (SELECT 43 AS col) +INSERT INTO cte_tbl SELECT * FROM S +-- !query schema +struct<> +-- !query output + + + +-- !query +SELECT * FROM cte_tbl +-- !query schema +struct +-- !query output +42 +43 + + +-- !query +INSERT INTO cte_tbl WITH s AS (SELECT 44 AS col) SELECT * FROM s +-- !query schema +struct<> +-- !query output + + + +-- !query +SELECT * FROM cte_tbl +-- !query schema +struct +-- !query output +42 +43 +44 + + +-- !query +CREATE TABLE cte_tbl2 (col INT) USING csv +-- !query schema +struct<> +-- !query output + + + +-- !query +WITH s AS (SELECT 45 AS col) +FROM s +INSERT INTO cte_tbl SELECT col +INSERT INTO cte_tbl2 SELECT col +-- !query schema +struct<> +-- !query output + + + +-- !query +SELECT * FROM cte_tbl +-- !query schema +struct +-- !query output +42 +43 +44 +45 + + +-- !query +SELECT * FROM cte_tbl2 +-- !query schema +struct +-- !query output +45 + + +-- !query +DROP TABLE cte_tbl +-- !query schema +struct<> +-- !query output + + + +-- !query +DROP TABLE cte_tbl2 +-- !query schema +struct<> +-- !query output + diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala index 5bf04460f522..eef2ae1f737d 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala @@ -21,7 +21,7 @@ import scala.util.control.NonFatal import org.apache.spark.sql.{Row, SaveMode, SparkSession} import org.apache.spark.sql.catalyst.catalog.CatalogTable -import org.apache.spark.sql.catalyst.plans.logical.{CTEInChildren, LogicalPlan, WithCTE} +import org.apache.spark.sql.catalyst.plans.logical.{CTEInChildren, CTERelationDef, LogicalPlan, WithCTE} import org.apache.spark.sql.catalyst.util.CharVarcharUtils import org.apache.spark.sql.errors.QueryCompilationErrors import org.apache.spark.sql.execution.command.{DataWritingCommand, LeafRunnableCommand} @@ -112,7 +112,7 @@ case class CreateHiveTableAsSelectCommand( s"TableName: ${tableDesc.identifier.table}]" } - override def withCTE(withCTE: WithCTE): LogicalPlan = { - copy(query = withCTE.copy(plan = this.query)) + override def withCTEDefs(cteDefs: Seq[CTERelationDef]): LogicalPlan = { + copy(query = WithCTE(query, cteDefs)) } } From 45672ac5d8845f5230017d75e7e2e9ef3c5917e4 Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Sat, 22 Jul 2023 10:43:23 +0800 Subject: [PATCH 8/9] address comments --- .../catalyst/analysis/CTESubstitution.scala | 38 +++++++++---------- 1 file changed, 18 insertions(+), 20 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CTESubstitution.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CTESubstitution.scala index 6da3e1bb88de..cd174f718928 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CTESubstitution.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CTESubstitution.scala @@ -56,27 +56,24 @@ object CTESubstitution extends Rule[LogicalPlan] { return plan } - val forceInline = if (conf.getConf(SQLConf.LEGACY_INLINE_CTE_IN_COMMANDS)) { - // The legacy behavior always inlines the CTE relations for queries in commands. - plan.exists { - case _: Command | _: ParsedStatement | _: InsertIntoDir => true - case _ => false - } - } else { - val commands = plan.collect { - case c @ (_: Command | _: ParsedStatement | _: InsertIntoDir) => c - } - if (commands.length == 1) { + val commands = plan.collect { + case c @ (_: Command | _: ParsedStatement | _: InsertIntoDir) => c + } + val forceInline = if (commands.length == 1) { + if (conf.getConf(SQLConf.LEGACY_INLINE_CTE_IN_COMMANDS)) { + // The legacy behavior always inlines the CTE relations for queries in commands. + true + } else { // If there is only one command and it's `CTEInChildren`, we can resolve // CTE normally and don't need to force inline. !commands.head.isInstanceOf[CTEInChildren] - } else if (commands.length > 1) { - // This can happen with the multi-insert statement. We should fall back to - // the legacy behavior. - true - } else { - false } + } else if (commands.length > 1) { + // This can happen with the multi-insert statement. We should fall back to + // the legacy behavior. + true + } else { + false } val cteDefs = ArrayBuffer.empty[CTERelationDef] @@ -304,9 +301,10 @@ object CTESubstitution extends Rule[LogicalPlan] { * children. There are two reasons: * 1. Some rules will pattern match the root command nodes, and we should keep command * as the root node to not break them. - * 2. `Dataset` eagerly executes the commands inside a query plan. However, the CTE - * references inside commands will be invalid if we execute the command alone, as - * the CTE definitions are outside of the command. + * 2. `Dataset` eagerly executes the commands inside a query plan. For example, + * sql("WITH v ... CREATE TABLE t AS SELECT * FROM v") will create the table instead of just + * analyzing the command. However, the CTE references inside commands will be invalid if we + * execute the command alone, as the CTE definitions are outside of the command. */ private def withCTEDefs(p: LogicalPlan, cteDefs: Seq[CTERelationDef]): LogicalPlan = { p match { From 8a36db13c932dbbb577fa53f8b1350e2ce2f309b Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Mon, 24 Jul 2023 20:33:23 -0700 Subject: [PATCH 9/9] Update sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala --- .../src/main/scala/org/apache/spark/sql/internal/SQLConf.scala | 1 + 1 file changed, 1 insertion(+) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index e231884033b0..90a695b5149d 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -3764,6 +3764,7 @@ object SQLConf { .doc("If true, always inline the CTE relations for the queries in commands. This is the " + "legacy behavior which may produce incorrect results because Spark may evaluate a CTE " + "relation more than once, even if it's nondeterministic.") + .version("4.0.0") .booleanConf .createWithDefault(false)