Skip to content

Commit 93cf217

Browse files
committed
SPARK-23877: Fix edge case with unusual project/filter order.
1 parent 6e0685e commit 93cf217

File tree

2 files changed

+48
-16
lines changed

2 files changed

+48
-16
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/OptimizeMetadataOnlyQuery.scala

Lines changed: 18 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -49,9 +49,9 @@ case class OptimizeMetadataOnlyQuery(catalog: SessionCatalog) extends Rule[Logic
4949
}
5050

5151
plan.transform {
52-
case a @ Aggregate(_, aggExprs, child @ PartitionedRelation(partAttrs, filters, relation)) =>
52+
case a @ Aggregate(_, aggExprs, child @ PartitionedRelation(_, attrs, filters, rel)) =>
5353
// We only apply this optimization when only partitioned attributes are scanned.
54-
if (a.references.subsetOf(partAttrs)) {
54+
if (a.references.subsetOf(attrs)) {
5555
val aggFunctions = aggExprs.flatMap(_.collect {
5656
case agg: AggregateExpression => agg
5757
})
@@ -67,7 +67,7 @@ case class OptimizeMetadataOnlyQuery(catalog: SessionCatalog) extends Rule[Logic
6767
})
6868
}
6969
if (isAllDistinctAgg) {
70-
a.withNewChildren(Seq(replaceTableScanWithPartitionMetadata(child, relation, filters)))
70+
a.withNewChildren(Seq(replaceTableScanWithPartitionMetadata(child, rel, filters)))
7171
} else {
7272
a
7373
}
@@ -159,26 +159,32 @@ case class OptimizeMetadataOnlyQuery(catalog: SessionCatalog) extends Rule[Logic
159159
*/
160160
object PartitionedRelation extends PredicateHelper {
161161

162-
def unapply(plan: LogicalPlan): Option[(AttributeSet, Seq[Expression], LogicalPlan)] = {
162+
def unapply(
163+
plan: LogicalPlan): Option[(AttributeSet, AttributeSet, Seq[Expression], LogicalPlan)] = {
163164
plan match {
164165
case l @ LogicalRelation(fsRelation: HadoopFsRelation, _, _, _)
165166
if fsRelation.partitionSchema.nonEmpty =>
166-
val partAttrs = getPartitionAttrs(fsRelation.partitionSchema.map(_.name), l)
167-
Some((AttributeSet(partAttrs), Nil, l))
167+
val partAttrs = AttributeSet(getPartitionAttrs(fsRelation.partitionSchema.map(_.name), l))
168+
Some((partAttrs, partAttrs, Nil, l))
168169

169170
case relation: HiveTableRelation if relation.tableMeta.partitionColumnNames.nonEmpty =>
170-
val partAttrs = getPartitionAttrs(relation.tableMeta.partitionColumnNames, relation)
171-
Some((AttributeSet(partAttrs), Nil, relation))
171+
val partAttrs = AttributeSet(
172+
getPartitionAttrs(relation.tableMeta.partitionColumnNames, relation))
173+
Some((partAttrs, partAttrs, Nil, relation))
172174

173175
case p @ Project(projectList, child) if projectList.forall(_.deterministic) =>
174-
unapply(child).flatMap { case (partAttrs, filters, relation) =>
175-
if (p.references.subsetOf(partAttrs)) Some((p.outputSet, filters, relation)) else None
176+
unapply(child).flatMap { case (partAttrs, attrs, filters, relation) =>
177+
if (p.references.subsetOf(attrs)) {
178+
Some((partAttrs, p.outputSet, filters, relation))
179+
} else {
180+
None
181+
}
176182
}
177183

178184
case f @ Filter(condition, child) if condition.deterministic =>
179-
unapply(child).flatMap { case (partAttrs, filters, relation) =>
185+
unapply(child).flatMap { case (partAttrs, attrs, filters, relation) =>
180186
if (f.references.subsetOf(partAttrs)) {
181-
Some((partAttrs, splitConjunctivePredicates(condition) ++ filters, relation))
187+
Some((partAttrs, attrs, splitConjunctivePredicates(condition) ++ filters, relation))
182188
} else {
183189
None
184190
}

sql/hive/src/test/scala/org/apache/spark/sql/hive/OptimizeHiveMetadataOnlyQuerySuite.scala

Lines changed: 30 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,22 +21,48 @@ import org.scalatest.BeforeAndAfter
2121

2222
import org.apache.spark.metrics.source.HiveCatalogMetrics
2323
import org.apache.spark.sql.QueryTest
24+
import org.apache.spark.sql.catalyst.expressions.NamedExpression
25+
import org.apache.spark.sql.catalyst.plans.logical.{Distinct, Filter, Project, SubqueryAlias}
2426
import org.apache.spark.sql.hive.test.TestHiveSingleton
2527
import org.apache.spark.sql.test.SQLTestUtils
28+
import org.apache.spark.sql.types.{IntegerType, StructField, StructType}
2629

2730
class OptimizeHiveMetadataOnlyQuerySuite extends QueryTest with TestHiveSingleton
2831
with BeforeAndAfter with SQLTestUtils {
2932

33+
import spark.implicits._
34+
35+
before {
36+
sql("CREATE TABLE metadata_only (id bigint, data string) PARTITIONED BY (part int)")
37+
(0 to 10).foreach(p => sql(s"ALTER TABLE metadata_only ADD PARTITION (part=$p)"))
38+
}
39+
3040
test("SPARK-23877: validate metadata-only query pushes filters to metastore") {
3141
withTable("metadata_only") {
32-
sql("CREATE TABLE metadata_only (id bigint, data string) PARTITIONED BY (part int)")
33-
(0 to 100).foreach(p => sql(s"ALTER TABLE metadata_only ADD PARTITION (part=$p)"))
42+
val startCount = HiveCatalogMetrics.METRIC_PARTITIONS_FETCHED.getCount
3443

3544
// verify the number of matching partitions
36-
assert(sql("SELECT DISTINCT part FROM metadata_only WHERE part < 10").collect.size === 10)
45+
assert(sql("SELECT DISTINCT part FROM metadata_only WHERE part < 5").collect().length === 5)
3746

3847
// verify that the partition predicate was pushed down to the metastore
39-
assert(HiveCatalogMetrics.METRIC_PARTITIONS_FETCHED.getCount === 10)
48+
assert(HiveCatalogMetrics.METRIC_PARTITIONS_FETCHED.getCount - startCount === 5)
49+
}
50+
}
51+
52+
test("SPARK-23877: filter on projected expression") {
53+
withTable("metadata_only") {
54+
val startCount = HiveCatalogMetrics.METRIC_PARTITIONS_FETCHED.getCount
55+
56+
// verify the matching partitions
57+
val partitions = spark.internalCreateDataFrame(Distinct(Filter(($"x" < 5).expr,
58+
Project(Seq(($"part" + 1).as("x").expr.asInstanceOf[NamedExpression]),
59+
spark.table("metadata_only").logicalPlan.asInstanceOf[SubqueryAlias].child)))
60+
.queryExecution.toRdd, StructType(Seq(StructField("x", IntegerType))))
61+
62+
checkAnswer(partitions, Seq(1, 2, 3, 4).toDF("x"))
63+
64+
// verify that the partition predicate was not pushed down to the metastore
65+
assert(HiveCatalogMetrics.METRIC_PARTITIONS_FETCHED.getCount - startCount == 11)
4066
}
4167
}
4268
}

0 commit comments

Comments
 (0)