-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-15752] [SQL] Optimize metadata only query that has an aggregate whose children are deterministic project or filter operators. #13494
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 1 commit
2ca2c38
edea710
8426522
153293e
7dfb743
68e6d6d
595ef36
7d7ece0
2e55a9d
b2b6eba
c5a291e
6404c1f
1bb5812
7e3729e
fbf5d61
3411fd6
aefab7f
c5ccdea
ae6cf9f
159331b
3a1438b
c0a7d59
a4045ca
0a023e7
a9b38ab
a5ea995
1bed08d
a22e962
41fef2c
bd53678
88f7308
2568193
26a97f4
4297f9f
1a65aa7
d5e0df4
9d6dd76
9cb01d8
3e2687d
2b4faf3
88fd3bf
a894bb7
9546b40
85b695b
bcfe8e5
67211be
501f93b
8ee2a8c
d888c85
ff16509
358ad13
030776a
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
- Loading branch information
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -27,9 +27,11 @@ import org.apache.spark.sql.{AnalysisException, SaveMode, SparkSession} | |
| import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier} | ||
| import org.apache.spark.sql.catalyst.catalog._ | ||
| import org.apache.spark.sql.catalyst.expressions._ | ||
| import org.apache.spark.sql.catalyst.expressions.aggregate._ | ||
| import org.apache.spark.sql.catalyst.plans.logical | ||
| import org.apache.spark.sql.catalyst.plans.logical._ | ||
| import org.apache.spark.sql.catalyst.rules._ | ||
| import org.apache.spark.sql.execution.LogicalRDD | ||
| import org.apache.spark.sql.execution.command.CreateDataSourceTableUtils._ | ||
| import org.apache.spark.sql.execution.command.CreateHiveTableAsSelectLogicalPlan | ||
| import org.apache.spark.sql.execution.datasources.{Partition => _, _} | ||
|
|
@@ -506,6 +508,119 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log | |
| } | ||
| } | ||
|
|
||
| /** | ||
| * When scanning only partition columns, get results based on metadata without scanning files. | ||
| * It is used for distinct or distinct/Max/Min aggregations, example: max(partition). | ||
| */ | ||
| object MetadataOnlyOptimizer extends Rule[LogicalPlan] { | ||
|
|
||
| private def canSupportMetadataOnly(a: Aggregate): Boolean = { | ||
| val aggregateExpressions = a.aggregateExpressions.flatMap { expr => | ||
| expr.collect { | ||
| case agg: AggregateExpression => agg | ||
| } | ||
| }.distinct | ||
| aggregateExpressions.forall { agg => | ||
| if (agg.isDistinct) { | ||
| true | ||
| } else { | ||
| agg.aggregateFunction match { | ||
| case max: Max => true | ||
| case min: Min => true | ||
| case _ => false | ||
| } | ||
| } | ||
| } | ||
| } | ||
|
|
||
| private def findRelation(plan: LogicalPlan): (Option[LogicalPlan], Seq[Expression]) = { | ||
| plan match { | ||
| case relation @ LogicalRelation(files: HadoopFsRelation, _, table) | ||
| if files.partitionSchema.nonEmpty => | ||
| (Some(relation), Seq.empty[Expression]) | ||
|
|
||
| case relation: MetastoreRelation if relation.partitionKeys.nonEmpty => | ||
|
||
| (Some(relation), Seq.empty[Expression]) | ||
|
|
||
| case p @ Project(_, child) => | ||
|
||
| findRelation(child) | ||
|
|
||
| case f @ Filter(filterCondition, child) => | ||
| val (plan, conditions) = findRelation(child) | ||
| (plan, conditions ++ Seq(filterCondition)) | ||
|
|
||
| case SubqueryAlias(_, child) => | ||
| findRelation(child) | ||
|
|
||
| case _ => (None, Seq.empty[Expression]) | ||
| } | ||
| } | ||
|
|
||
| private def convertToMetadataOnlyPlan( | ||
| parent: LogicalPlan, | ||
| filters: Seq[Expression], | ||
| relation: LogicalPlan): LogicalPlan = relation match { | ||
| case l @ LogicalRelation(files: HadoopFsRelation, _, _) => | ||
| val attributeMap = l.output.map(attr => (attr.name, attr)).toMap | ||
| val partitionColumns = files.partitionSchema.map { field => | ||
| attributeMap.getOrElse(field.name, throw new AnalysisException( | ||
| s"Unable to resolve ${field.name} given [${l.output.map(_.name).mkString(", ")}]")) | ||
| } | ||
| val filterColumns = filters.flatMap(_.references) | ||
| val projectSet = parent.references ++ AttributeSet(filterColumns) | ||
| if (projectSet.subsetOf(AttributeSet(partitionColumns))) { | ||
| val selectedPartitions = files.location.listFiles(filters) | ||
| val partitionValues = selectedPartitions.map(_.values) | ||
| val valuesRdd = sparkSession.sparkContext.parallelize(partitionValues, 1) | ||
| parent.withNewChildren(LogicalRDD(partitionColumns, valuesRdd)(sparkSession) :: Nil) | ||
| } else { | ||
| parent | ||
| } | ||
|
|
||
| case relation: MetastoreRelation => | ||
| if (parent.references.subsetOf(AttributeSet(relation.partitionKeys))) { | ||
| val partitionColumnDataTypes = relation.partitionKeys.map(_.dataType) | ||
| val partitionValues = relation.getHiveQlPartitions(filters).map { p => | ||
| InternalRow.fromSeq(p.getValues.asScala.zip(partitionColumnDataTypes).map { | ||
| case (rawValue, dataType) => Cast(Literal(rawValue), dataType).eval(null) | ||
| }) | ||
| } | ||
| val valuesRdd = sparkSession.sparkContext.parallelize(partitionValues, 1) | ||
| val valuesPlan = LogicalRDD(relation.partitionKeys, valuesRdd)(sparkSession) | ||
| val child = filters.reduceLeftOption(And).map(Filter(_, valuesPlan)).getOrElse(valuesPlan) | ||
| parent.withNewChildren(child :: Nil) | ||
| } else { | ||
| parent | ||
| } | ||
|
|
||
| case _ => | ||
| parent | ||
| } | ||
|
|
||
| def apply(plan: LogicalPlan): LogicalPlan = { | ||
| if (!sparkSession.sessionState.conf.optimizerMetadataOnly) { | ||
| return plan | ||
| } | ||
| plan.transform { | ||
| case a @ Aggregate(_, _, child) if canSupportMetadataOnly(a) => | ||
| val (plan, filters) = findRelation(child) | ||
| if (plan.isDefined) { | ||
| convertToMetadataOnlyPlan(a, filters, plan.get) | ||
| } else { | ||
| a | ||
| } | ||
|
|
||
| case d @ Distinct(p @ Project(_, _)) => | ||
|
||
| val (plan, filters) = findRelation(p) | ||
| if (plan.isDefined) { | ||
| d.withNewChildren(convertToMetadataOnlyPlan(p, filters, plan.get) :: Nil) | ||
| } else { | ||
| d | ||
| } | ||
| } | ||
| } | ||
| } | ||
|
|
||
| } | ||
|
|
||
| /** | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we add more comments to explain the condition to enable metadata optimization? e.g. the agg expression must only reference to partition columns, all distinct agg functions,
MaxandMin, etc.