Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
52 commits
Select commit Hold shift + click to select a range
2ca2c38
init commit
lianhuiwang Jun 3, 2016
edea710
fix unit test
lianhuiwang Jun 3, 2016
8426522
Merge branch 'apache-master' into metadata-only
lianhuiwang Jun 3, 2016
153293e
fix unit test
lianhuiwang Jun 3, 2016
7dfb743
update
lianhuiwang Jun 24, 2016
68e6d6d
Revert "fix unit test"
lianhuiwang Jun 24, 2016
595ef36
Revert "fix unit test"
lianhuiwang Jun 24, 2016
7d7ece0
Merge branch 'apache-master' into metadata-only
lianhuiwang Jun 24, 2016
2e55a9d
Merge branch 'apache-master' into metadata-only
lianhuiwang Jun 24, 2016
b2b6eba
update
lianhuiwang Jun 24, 2016
c5a291e
Merge branch 'apache-master' into metadata-only
lianhuiwang Jun 24, 2016
6404c1f
update opt for core
lianhuiwang Jun 24, 2016
1bb5812
refactor
lianhuiwang Jun 24, 2016
7e3729e
add ut
lianhuiwang Jun 24, 2016
fbf5d61
fix ut
lianhuiwang Jun 24, 2016
3411fd6
fix project
lianhuiwang Jun 26, 2016
aefab7f
address comments
lianhuiwang Jun 27, 2016
c5ccdea
fix cube/rollup
lianhuiwang Jun 27, 2016
ae6cf9f
fix style
lianhuiwang Jun 27, 2016
159331b
refactor
lianhuiwang Jun 27, 2016
3a1438b
refactor
lianhuiwang Jun 28, 2016
c0a7d59
update
lianhuiwang Jun 29, 2016
a4045ca
add comments
lianhuiwang Jun 29, 2016
0a023e7
fix minor
lianhuiwang Jun 29, 2016
a9b38ab
rename
lianhuiwang Jun 29, 2016
a5ea995
update
lianhuiwang Jun 29, 2016
1bed08d
fix monir
lianhuiwang Jun 29, 2016
a22e962
refactor
lianhuiwang Jul 1, 2016
41fef2c
update
lianhuiwang Jul 1, 2016
bd53678
Merge branch 'apache-master' into metadata-only
lianhuiwang Jul 1, 2016
88f7308
update
lianhuiwang Jul 1, 2016
2568193
add ut
lianhuiwang Jul 1, 2016
26a97f4
address comments
lianhuiwang Jul 4, 2016
4297f9f
update name
lianhuiwang Jul 6, 2016
1a65aa7
address comments
lianhuiwang Jul 6, 2016
d5e0df4
update
lianhuiwang Jul 6, 2016
9d6dd76
update2
lianhuiwang Jul 6, 2016
9cb01d8
update
lianhuiwang Jul 6, 2016
3e2687d
doc improve
cloud-fan Jul 6, 2016
2b4faf3
update
cloud-fan Jul 7, 2016
88fd3bf
Merge pull request #2 from cloud-fan/metadata-only
lianhuiwang Jul 7, 2016
a894bb7
delete cases
lianhuiwang Jul 7, 2016
9546b40
Merge branch 'metadata-only' of https://github.com/lianhuiwang/spark …
lianhuiwang Jul 7, 2016
85b695b
update ut
lianhuiwang Jul 7, 2016
bcfe8e5
Merge branch 'master' of https://github.com/apache/spark into metadat…
lianhuiwang Jul 7, 2016
67211be
Merge branch 'master' of https://github.com/apache/spark into metadat…
lianhuiwang Jul 7, 2016
501f93b
address commetns
lianhuiwang Jul 11, 2016
8ee2a8c
refactor
lianhuiwang Jul 11, 2016
d888c85
fix minor
lianhuiwang Jul 11, 2016
ff16509
update
lianhuiwang Jul 12, 2016
358ad13
remove duplicate code
lianhuiwang Jul 12, 2016
030776a
fix minor
lianhuiwang Jul 12, 2016
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions docs/sql-programming-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -1476,6 +1476,18 @@ Configuration of Parquet can be done using the `setConf` method on `SparkSession
</p>
</td>
</tr>
<tr>
<td><code>spark.sql.optimizer.metadataOnly</code></td>
<td>true</td>
<td>
<p>
When true, enable the metadata-only query optimization that use the table's metadata to
produce the partition columns instead of table scans. It applies when all the columns scanned
are partition columns and the query has an aggregate operator that satisfies distinct
semantics.
</p>
</td>
</tr>
</table>

## JSON Datasets
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.execution

import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.catalog.{CatalogRelation, SessionCatalog}
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.aggregate._
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation}
import org.apache.spark.sql.internal.SQLConf

/**
* This rule optimizes the execution of queries that can be answered by looking only at
* partition-level metadata. This applies when all the columns scanned are partition columns, and
* the query has an aggregate operator that satisfies the following conditions:
* 1. aggregate expression is partition columns.
* e.g. SELECT col FROM tbl GROUP BY col.
* 2. aggregate function on partition columns with DISTINCT.
* e.g. SELECT col1, count(DISTINCT col2) FROM tbl GROUP BY col1.
* 3. aggregate function on partition columns which have same result w or w/o DISTINCT keyword.
* e.g. SELECT col1, Max(col2) FROM tbl GROUP BY col1.
*/
case class OptimizeMetadataOnlyQuery(
catalog: SessionCatalog,
conf: SQLConf) extends Rule[LogicalPlan] {

def apply(plan: LogicalPlan): LogicalPlan = {
if (!conf.optimizerMetadataOnly) {
return plan
}

plan.transform {
case a @ Aggregate(_, aggExprs, child @ PartitionedRelation(partAttrs, relation)) =>
// We only apply this optimization when only partitioned attributes are scanned.
if (a.references.subsetOf(partAttrs)) {
val aggFunctions = aggExprs.flatMap(_.collect {
case agg: AggregateExpression => agg
})
val isAllDistinctAgg = aggFunctions.forall { agg =>
agg.isDistinct || (agg.aggregateFunction match {
// `Max`, `Min`, `First` and `Last` are always distinct aggregate functions no matter
// they have DISTINCT keyword or not, as the result will be same.
case _: Max => true
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

First/Last?

Copy link
Contributor Author

@lianhuiwang lianhuiwang Jul 11, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, we need to handle them. thanks.

case _: Min => true
case _: First => true
case _: Last => true
case _ => false
})
}
if (isAllDistinctAgg) {
a.withNewChildren(Seq(replaceTableScanWithPartitionMetadata(child, relation)))
} else {
a
}
} else {
a
}
}
}

/**
* Returns the partition attributes of the table relation plan.
*/
private def getPartitionAttrs(
partitionColumnNames: Seq[String],
relation: LogicalPlan): Seq[Attribute] = {
val partColumns = partitionColumnNames.map(_.toLowerCase).toSet
relation.output.filter(a => partColumns.contains(a.name.toLowerCase))
}

/**
* Transform the given plan, find its table scan nodes that matches the given relation, and then
* replace the table scan node with its corresponding partition values.
*/
private def replaceTableScanWithPartitionMetadata(
child: LogicalPlan,
relation: LogicalPlan): LogicalPlan = {
child transform {
case plan if plan eq relation =>
relation match {
case l @ LogicalRelation(fsRelation: HadoopFsRelation, _, _) =>
val partAttrs = getPartitionAttrs(fsRelation.partitionSchema.map(_.name), l)
val partitionData = fsRelation.location.listFiles(filters = Nil)
LocalRelation(partAttrs, partitionData.map(_.values))

case relation: CatalogRelation =>
val partAttrs = getPartitionAttrs(relation.catalogTable.partitionColumnNames, relation)
val partitionData = catalog.listPartitions(relation.catalogTable.identifier).map { p =>
InternalRow.fromSeq(partAttrs.map { attr =>
Cast(Literal(p.spec(attr.name)), attr.dataType).eval()
})
}
LocalRelation(partAttrs, partitionData)

case _ =>
throw new IllegalStateException(s"unrecognized table scan node: $relation, " +
s"please turn off ${SQLConf.OPTIMIZER_METADATA_ONLY.key} and try again.")
}
}
}

/**
* A pattern that finds the partitioned table relation node inside the given plan, and returns a
* pair of the partition attributes and the table relation node.
*
* It keeps traversing down the given plan tree if there is a [[Project]] or [[Filter]] with
* deterministic expressions, and returns result after reaching the partitioned table relation
* node.
*/
object PartitionedRelation {

def unapply(plan: LogicalPlan): Option[(AttributeSet, LogicalPlan)] = plan match {
case l @ LogicalRelation(fsRelation: HadoopFsRelation, _, _)
if fsRelation.partitionSchema.nonEmpty =>
val partAttrs = getPartitionAttrs(fsRelation.partitionSchema.map(_.name), l)
Some(AttributeSet(partAttrs), l)

case relation: CatalogRelation if relation.catalogTable.partitionColumnNames.nonEmpty =>
val partAttrs = getPartitionAttrs(relation.catalogTable.partitionColumnNames, relation)
Some(AttributeSet(partAttrs), relation)

case p @ Project(projectList, child) if projectList.forall(_.deterministic) =>
unapply(child).flatMap { case (partAttrs, relation) =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Logic here is the same as the Filter case. Move this into a separate method?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because there are three lines, I think it is unnecessary for a separate method that causes extra method called.

if (p.references.subsetOf(partAttrs)) Some(p.outputSet, relation) else None
}

case f @ Filter(condition, child) if condition.deterministic =>
unapply(child).flatMap { case (partAttrs, relation) =>
if (f.references.subsetOf(partAttrs)) Some(partAttrs, relation) else None
}

case _ => None
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ class SparkOptimizer(
extends Optimizer(catalog, conf) {

override def batches: Seq[Batch] = super.batches :+
Batch("Optimize Metadata Only Query", Once, OptimizeMetadataOnlyQuery(catalog, conf)) :+
Batch("Extract Python UDF from Aggregate", Once, ExtractPythonUDFFromAggregate) :+
Batch("User Provided Optimizers", fixedPoint, experimentalMethods.extraOptimizations: _*)
}
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,14 @@ object SQLConf {
.booleanConf
.createWithDefault(false)

val OPTIMIZER_METADATA_ONLY = SQLConfigBuilder("spark.sql.optimizer.metadataOnly")
.doc("When true, enable the metadata-only query optimization that use the table's metadata " +
"to produce the partition columns instead of table scans. It applies when all the columns " +
"scanned are partition columns and the query has an aggregate operator that satisfies " +
"distinct semantics.")
.booleanConf
.createWithDefault(true)

val COLUMN_NAME_OF_CORRUPT_RECORD = SQLConfigBuilder("spark.sql.columnNameOfCorruptRecord")
.doc("The name of internal column for storing raw/un-parsed JSON records that fail to parse.")
.stringConf
Expand Down Expand Up @@ -594,6 +602,8 @@ private[sql] class SQLConf extends Serializable with CatalystConf with Logging {

def metastorePartitionPruning: Boolean = getConf(HIVE_METASTORE_PARTITION_PRUNING)

def optimizerMetadataOnly: Boolean = getConf(OPTIMIZER_METADATA_ONLY)

def wholeStageEnabled: Boolean = getConf(WHOLESTAGE_CODEGEN_ENABLED)

def wholeStageMaxNumFields: Int = getConf(WHOLESTAGE_MAX_NUM_FIELDS)
Expand Down
36 changes: 36 additions & 0 deletions sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2896,4 +2896,40 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext {
sql(s"SELECT '$literal' AS DUMMY"),
Row(s"$expected") :: Nil)
}

test("SPARK-15752 optimize metadata only query for datasource table") {
withSQLConf(SQLConf.OPTIMIZER_METADATA_ONLY.key -> "true") {
withTable("srcpart_15752") {
val data = (1 to 10).map(i => (i, s"data-$i", i % 2, if ((i % 2) == 0) "a" else "b"))
.toDF("col1", "col2", "partcol1", "partcol2")
data.write.partitionBy("partcol1", "partcol2").mode("append").saveAsTable("srcpart_15752")
checkAnswer(
sql("select partcol1 from srcpart_15752 group by partcol1"),
Row(0) :: Row(1) :: Nil)
checkAnswer(
sql("select partcol1 from srcpart_15752 where partcol1 = 1 group by partcol1"),
Row(1))
checkAnswer(
sql("select partcol1, count(distinct partcol2) from srcpart_15752 group by partcol1"),
Row(0, 1) :: Row(1, 1) :: Nil)
checkAnswer(
sql("select partcol1, count(distinct partcol2) from srcpart_15752 where partcol1 = 1 " +
"group by partcol1"),
Row(1, 1) :: Nil)
checkAnswer(sql("select distinct partcol1 from srcpart_15752"), Row(0) :: Row(1) :: Nil)
checkAnswer(sql("select distinct partcol1 from srcpart_15752 where partcol1 = 1"), Row(1))
checkAnswer(
sql("select distinct col from (select partcol1 + 1 as col from srcpart_15752 " +
"where partcol1 = 1) t"),
Row(2))
checkAnswer(sql("select max(partcol1) from srcpart_15752"), Row(1))
checkAnswer(sql("select max(partcol1) from srcpart_15752 where partcol1 = 1"), Row(1))
checkAnswer(sql("select max(partcol1) from (select partcol1 from srcpart_15752) t"), Row(1))
checkAnswer(
sql("select max(col) from (select partcol1 + 1 as col from srcpart_15752 " +
"where partcol1 = 1) t"),
Row(2))
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.execution

import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.plans.logical.LocalRelation
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.SharedSQLContext

class OptimizeMetadataOnlyQuerySuite extends QueryTest with SharedSQLContext {
import testImplicits._

override def beforeAll(): Unit = {
super.beforeAll()
val data = (1 to 10).map(i => (i, s"data-$i", i % 2, if ((i % 2) == 0) "even" else "odd"))
.toDF("col1", "col2", "partcol1", "partcol2")
data.write.partitionBy("partcol1", "partcol2").mode("append").saveAsTable("srcpart")
}

override protected def afterAll(): Unit = {
try {
sql("DROP TABLE IF EXISTS srcpart")
} finally {
super.afterAll()
}
}

private def assertMetadataOnlyQuery(df: DataFrame): Unit = {
val localRelations = df.queryExecution.optimizedPlan.collect {
case l @ LocalRelation(_, _) => l
}
assert(localRelations.size == 1)
}

private def assertNotMetadataOnlyQuery(df: DataFrame): Unit = {
val localRelations = df.queryExecution.optimizedPlan.collect {
case l @ LocalRelation(_, _) => l
}
assert(localRelations.size == 0)
}

private def testMetadataOnly(name: String, sqls: String*): Unit = {
test(name) {
withSQLConf(SQLConf.OPTIMIZER_METADATA_ONLY.key -> "true") {
sqls.foreach { case q => assertMetadataOnlyQuery(sql(q)) }
}
withSQLConf(SQLConf.OPTIMIZER_METADATA_ONLY.key -> "false") {
sqls.foreach { case q => assertNotMetadataOnlyQuery(sql(q)) }
}
}
}

private def testNotMetadataOnly(name: String, sqls: String*): Unit = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When is this one used?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NVM - I am blind

test(name) {
withSQLConf(SQLConf.OPTIMIZER_METADATA_ONLY.key -> "true") {
sqls.foreach { case q => assertNotMetadataOnlyQuery(sql(q)) }
}
withSQLConf(SQLConf.OPTIMIZER_METADATA_ONLY.key -> "false") {
sqls.foreach { case q => assertNotMetadataOnlyQuery(sql(q)) }
}
}
}

testMetadataOnly(
"Aggregate expression is partition columns",
"select partcol1 from srcpart group by partcol1",
"select partcol2 from srcpart where partcol1 = 0 group by partcol2")

testMetadataOnly(
"Distinct aggregate function on partition columns",
"SELECT partcol1, count(distinct partcol2) FROM srcpart group by partcol1",
"SELECT partcol1, count(distinct partcol2) FROM srcpart where partcol1 = 0 group by partcol1")

testMetadataOnly(
"Distinct on partition columns",
"select distinct partcol1, partcol2 from srcpart",
"select distinct c1 from (select partcol1 + 1 as c1 from srcpart where partcol1 = 0) t")

testMetadataOnly(
"Aggregate function on partition columns which have same result w or w/o DISTINCT keyword",
"select max(partcol1) from srcpart",
"select min(partcol1) from srcpart where partcol1 = 0",
"select first(partcol1) from srcpart",
"select last(partcol1) from srcpart where partcol1 = 0",
"select partcol2, min(partcol1) from srcpart where partcol1 = 0 group by partcol2",
"select max(c1) from (select partcol1 + 1 as c1 from srcpart where partcol1 = 0) t")

testNotMetadataOnly(
"Don't optimize metadata only query for non-partition columns",
"select col1 from srcpart group by col1",
"select partcol1, max(col1) from srcpart group by partcol1",
"select partcol1, count(distinct col1) from srcpart group by partcol1",
"select distinct partcol1, col1 from srcpart")

testNotMetadataOnly(
"Don't optimize metadata only query for non-distinct aggregate function on partition columns",
"select partcol1, sum(partcol2) from srcpart group by partcol1",
"select partcol1, count(partcol2) from srcpart group by partcol1")

testNotMetadataOnly(
"Don't optimize metadata only query for GroupingSet/Union operator",
"select partcol1, max(partcol2) from srcpart where partcol1 = 0 group by rollup (partcol1)",
"select partcol2 from (select partcol2 from srcpart where partcol1 = 0 union all " +
"select partcol2 from srcpart where partcol1 = 1) t group by partcol2")
}
Loading