-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-15752] [SQL] Optimize metadata only query that has an aggregate whose children are deterministic project or filter operators. #13494
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 49 commits
2ca2c38
edea710
8426522
153293e
7dfb743
68e6d6d
595ef36
7d7ece0
2e55a9d
b2b6eba
c5a291e
6404c1f
1bb5812
7e3729e
fbf5d61
3411fd6
aefab7f
c5ccdea
ae6cf9f
159331b
3a1438b
c0a7d59
a4045ca
0a023e7
a9b38ab
a5ea995
1bed08d
a22e962
41fef2c
bd53678
88f7308
2568193
26a97f4
4297f9f
1a65aa7
d5e0df4
9d6dd76
9cb01d8
3e2687d
2b4faf3
88fd3bf
a894bb7
9546b40
85b695b
bcfe8e5
67211be
501f93b
8ee2a8c
d888c85
ff16509
358ad13
030776a
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,153 @@ | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one or more | ||
| * contributor license agreements. See the NOTICE file distributed with | ||
| * this work for additional information regarding copyright ownership. | ||
| * The ASF licenses this file to You under the Apache License, Version 2.0 | ||
| * (the "License"); you may not use this file except in compliance with | ||
| * the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
|
|
||
| package org.apache.spark.sql.execution | ||
|
|
||
| import org.apache.spark.sql.catalyst.InternalRow | ||
| import org.apache.spark.sql.catalyst.catalog.{CatalogRelation, SessionCatalog} | ||
| import org.apache.spark.sql.catalyst.expressions._ | ||
| import org.apache.spark.sql.catalyst.expressions.aggregate._ | ||
| import org.apache.spark.sql.catalyst.plans.logical._ | ||
| import org.apache.spark.sql.catalyst.rules.Rule | ||
| import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation} | ||
| import org.apache.spark.sql.internal.SQLConf | ||
|
|
||
| /** | ||
| * This rule optimizes the execution of queries that can be answered by looking only at | ||
| * partition-level metadata. This applies when all the columns scanned are partition columns, and | ||
| * the query has an aggregate operator that satisfies the following conditions: | ||
| * 1. aggregate expression is partition columns. | ||
| * e.g. SELECT col FROM tbl GROUP BY col. | ||
| * 2. aggregate function on partition columns with DISTINCT. | ||
| * e.g. SELECT col1, count(DISTINCT col2) FROM tbl GROUP BY col1. | ||
| * 3. aggregate function on partition columns which have same result w or w/o DISTINCT keyword. | ||
| * e.g. SELECT col1, Max(col2) FROM tbl GROUP BY col1. | ||
| */ | ||
| case class OptimizeMetadataOnlyQuery( | ||
| catalog: SessionCatalog, | ||
| conf: SQLConf) extends Rule[LogicalPlan] { | ||
|
|
||
| def apply(plan: LogicalPlan): LogicalPlan = { | ||
| if (!conf.optimizerMetadataOnly) { | ||
| return plan | ||
| } | ||
|
|
||
| plan.transform { | ||
| case a @ Aggregate(_, aggExprs, child @ PartitionedRelation(partAttrs, relation)) => | ||
| // We only apply this optimization when only partitioned attributes are scanned. | ||
| if (a.references.subsetOf(partAttrs)) { | ||
| val aggFunctions = aggExprs.flatMap(_.collect { | ||
| case agg: AggregateExpression => agg | ||
| }) | ||
| val isAllDistinctAgg = aggFunctions.forall { agg => | ||
| agg.isDistinct || (agg.aggregateFunction match { | ||
| // `Max`, `Min`, `First` and `Last` are always distinct aggregate functions no matter | ||
| // they have DISTINCT keyword or not, as the result will be same. | ||
| case _: Max => true | ||
| case _: Min => true | ||
| case _: First => true | ||
| case _: Last => true | ||
| case _ => false | ||
| }) | ||
| } | ||
| if (isAllDistinctAgg) { | ||
| a.withNewChildren(Seq(replaceTableScanWithPartitionMetadata(child, relation))) | ||
| } else { | ||
| a | ||
| } | ||
| } else { | ||
| a | ||
| } | ||
| } | ||
| } | ||
|
|
||
| /** | ||
| * Transform the given plan, find its table scan nodes that matches the given relation, and then | ||
| * replace the table scan node with its corresponding partition values. | ||
| */ | ||
| private def replaceTableScanWithPartitionMetadata( | ||
| child: LogicalPlan, | ||
| relation: LogicalPlan): LogicalPlan = { | ||
| child transform { | ||
| case plan if plan eq relation => | ||
| relation match { | ||
| case l @ LogicalRelation(fsRelation: HadoopFsRelation, _, _) => | ||
| val partAttrs = PartitionedRelation.getPartitionAttrs( | ||
|
||
| fsRelation.partitionSchema.map(_.name), l) | ||
| val partitionData = fsRelation.location.listFiles(filters = Nil) | ||
| LocalRelation(partAttrs, partitionData.map(_.values)) | ||
|
|
||
| case relation: CatalogRelation => | ||
| val partAttrs = PartitionedRelation.getPartitionAttrs( | ||
| relation.catalogTable.partitionColumnNames, relation) | ||
| val partitionData = catalog.listPartitions(relation.catalogTable.identifier).map { p => | ||
| InternalRow.fromSeq(partAttrs.map { attr => | ||
| Cast(Literal(p.spec(attr.name)), attr.dataType).eval() | ||
| }) | ||
| } | ||
| LocalRelation(partAttrs, partitionData) | ||
|
|
||
| case _ => | ||
| throw new IllegalStateException(s"unrecognized table scan node: $relation, " + | ||
| s"please turn off ${SQLConf.OPTIMIZER_METADATA_ONLY.key} and try again.") | ||
| } | ||
| } | ||
| } | ||
|
|
||
| /** | ||
| * A pattern that finds the partitioned table relation node inside the given plan, and returns a | ||
| * pair of the partition attributes and the table relation node. | ||
| * | ||
| * It keeps traversing down the given plan tree if there is a [[Project]] or [[Filter]] with | ||
| * deterministic expressions, and returns result after reaching the partitioned table relation | ||
| * node. | ||
| */ | ||
| private[execution] object PartitionedRelation { | ||
|
||
|
|
||
| def unapply(plan: LogicalPlan): Option[(AttributeSet, LogicalPlan)] = plan match { | ||
| case l @ LogicalRelation(fsRelation: HadoopFsRelation, _, _) | ||
| if fsRelation.partitionSchema.nonEmpty => | ||
| val partAttrs = getPartitionAttrs(fsRelation.partitionSchema.map(_.name), l) | ||
| Some(AttributeSet(partAttrs), l) | ||
|
|
||
| case relation: CatalogRelation if relation.catalogTable.partitionColumnNames.nonEmpty => | ||
| val partAttrs = getPartitionAttrs(relation.catalogTable.partitionColumnNames, relation) | ||
| Some(AttributeSet(partAttrs), relation) | ||
|
|
||
| case p @ Project(projectList, child) if projectList.forall(_.deterministic) => | ||
| unapply(child).flatMap { case (partAttrs, relation) => | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Logic here is the same as the Filter case. Move this into a separate method?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Because there are three lines, I think it is unnecessary for a separate method that causes extra method called. |
||
| if (p.references.subsetOf(partAttrs)) Some(p.outputSet, relation) else None | ||
| } | ||
|
|
||
| case f @ Filter(condition, child) if condition.deterministic => | ||
| unapply(child).flatMap { case (partAttrs, relation) => | ||
| if (f.references.subsetOf(partAttrs)) Some(partAttrs, relation) else None | ||
| } | ||
|
|
||
| case _ => None | ||
| } | ||
|
|
||
| /** | ||
| * Returns the partition attributes of the table relation plan. | ||
| */ | ||
| def getPartitionAttrs(partitionColumnNames: Seq[String], relation: LogicalPlan) | ||
|
||
| : Seq[Attribute] = { | ||
| val partColumns = partitionColumnNames.map(_.toLowerCase).toSet | ||
| relation.output.filter(a => partColumns.contains(a.name.toLowerCase)) | ||
| } | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,122 @@ | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one or more | ||
| * contributor license agreements. See the NOTICE file distributed with | ||
| * this work for additional information regarding copyright ownership. | ||
| * The ASF licenses this file to You under the Apache License, Version 2.0 | ||
| * (the "License"); you may not use this file except in compliance with | ||
| * the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
|
|
||
| package org.apache.spark.sql.execution | ||
|
|
||
| import org.apache.spark.sql._ | ||
| import org.apache.spark.sql.catalyst.plans.logical.LocalRelation | ||
| import org.apache.spark.sql.internal.SQLConf | ||
| import org.apache.spark.sql.test.SharedSQLContext | ||
|
|
||
| class OptimizeMetadataOnlyQuerySuite extends QueryTest with SharedSQLContext { | ||
| import testImplicits._ | ||
|
|
||
| override def beforeAll(): Unit = { | ||
| super.beforeAll() | ||
| val data = (1 to 10).map(i => (i, s"data-$i", i % 2, if ((i % 2) == 0) "even" else "odd")) | ||
| .toDF("col1", "col2", "partcol1", "partcol2") | ||
| data.write.partitionBy("partcol1", "partcol2").mode("append").saveAsTable("srcpart") | ||
| } | ||
|
|
||
| override protected def afterAll(): Unit = { | ||
| try { | ||
| sql("DROP TABLE IF EXISTS srcpart") | ||
| } finally { | ||
| super.afterAll() | ||
| } | ||
| } | ||
|
|
||
| private def assertMetadataOnlyQuery(df: DataFrame): Unit = { | ||
| val localRelations = df.queryExecution.optimizedPlan.collect { | ||
| case l @ LocalRelation(_, _) => l | ||
| } | ||
| assert(localRelations.size == 1) | ||
| } | ||
|
|
||
| private def assertNotMetadataOnlyQuery(df: DataFrame): Unit = { | ||
| val localRelations = df.queryExecution.optimizedPlan.collect { | ||
| case l @ LocalRelation(_, _) => l | ||
| } | ||
| assert(localRelations.size == 0) | ||
| } | ||
|
|
||
| private def testMetadataOnly(name: String, sqls: String*): Unit = { | ||
| test(name) { | ||
| withSQLConf(SQLConf.OPTIMIZER_METADATA_ONLY.key -> "true") { | ||
| sqls.foreach { case q => assertMetadataOnlyQuery(sql(q)) } | ||
| } | ||
| withSQLConf(SQLConf.OPTIMIZER_METADATA_ONLY.key -> "false") { | ||
| sqls.foreach { case q => assertNotMetadataOnlyQuery(sql(q)) } | ||
| } | ||
| } | ||
| } | ||
|
|
||
| private def testNotMetadataOnly(name: String, sqls: String*): Unit = { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. When is this one used?
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. NVM - I am blind |
||
| test(name) { | ||
| withSQLConf(SQLConf.OPTIMIZER_METADATA_ONLY.key -> "true") { | ||
| sqls.foreach { case q => assertNotMetadataOnlyQuery(sql(q)) } | ||
| } | ||
| withSQLConf(SQLConf.OPTIMIZER_METADATA_ONLY.key -> "false") { | ||
| sqls.foreach { case q => assertNotMetadataOnlyQuery(sql(q)) } | ||
| } | ||
| } | ||
| } | ||
|
|
||
| testMetadataOnly( | ||
| "OptimizeMetadataOnlyQuery test: aggregate expression is partition columns", | ||
|
||
| "select partcol1 from srcpart group by partcol1", | ||
| "select partcol2 from srcpart where partcol1 = 0 group by partcol2") | ||
|
|
||
| testMetadataOnly( | ||
| "OptimizeMetadataOnlyQuery test: distinct aggregate function on partition columns", | ||
| "SELECT partcol1, count(distinct partcol2) FROM srcpart group by partcol1", | ||
| "SELECT partcol1, count(distinct partcol2) FROM srcpart where partcol1 = 0 group by partcol1") | ||
|
|
||
| testMetadataOnly( | ||
| "OptimizeMetadataOnlyQuery test: distinct on partition columns", | ||
| "select distinct partcol1, partcol2 from srcpart", | ||
| "select distinct c1 from (select partcol1 + 1 as c1 from srcpart where partcol1 = 0) t") | ||
|
|
||
| testMetadataOnly( | ||
| "OptimizeMetadataOnlyQuery test: aggregate function on partition columns which have same " + | ||
| "result w or w/o DISTINCT keyword.", | ||
| "select max(partcol1) from srcpart", | ||
| "select min(partcol1) from srcpart where partcol1 = 0", | ||
| "select first(partcol1) from srcpart", | ||
| "select last(partcol1) from srcpart where partcol1 = 0", | ||
| "select partcol2, min(partcol1) from srcpart where partcol1 = 0 group by partcol2", | ||
| "select max(c1) from (select partcol1 + 1 as c1 from srcpart where partcol1 = 0) t") | ||
|
|
||
| testNotMetadataOnly( | ||
| "OptimizeMetadataOnlyQuery test: unsupported for non-partition columns", | ||
| "select col1 from srcpart group by col1", | ||
| "select partcol1, max(col1) from srcpart group by partcol1", | ||
| "select partcol1, count(distinct col1) from srcpart group by partcol1", | ||
| "select distinct partcol1, col1 from srcpart") | ||
|
|
||
| testNotMetadataOnly( | ||
| "OptimizeMetadataOnlyQuery test: unsupported for non-distinct aggregate function on " + | ||
| "partition columns", | ||
| "select partcol1, sum(partcol2) from srcpart group by partcol1", | ||
| "select partcol1, count(partcol2) from srcpart group by partcol1") | ||
|
|
||
| testNotMetadataOnly( | ||
| "OptimizeMetadataOnlyQuery test: unsupported for GroupingSet/Union operator", | ||
| "select partcol1, max(partcol2) from srcpart where partcol1 = 0 group by rollup (partcol1)", | ||
| "select partcol2 from (select partcol2 from srcpart where partcol1 = 0 union all " + | ||
| "select partcol2 from srcpart where partcol1 = 1) t group by partcol2") | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
First/Last?
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, we need to handle them. thanks.