Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
52 commits
Select commit Hold shift + click to select a range
2ca2c38
init commit
lianhuiwang Jun 3, 2016
edea710
fix unit test
lianhuiwang Jun 3, 2016
8426522
Merge branch 'apache-master' into metadata-only
lianhuiwang Jun 3, 2016
153293e
fix unit test
lianhuiwang Jun 3, 2016
7dfb743
update
lianhuiwang Jun 24, 2016
68e6d6d
Revert "fix unit test"
lianhuiwang Jun 24, 2016
595ef36
Revert "fix unit test"
lianhuiwang Jun 24, 2016
7d7ece0
Merge branch 'apache-master' into metadata-only
lianhuiwang Jun 24, 2016
2e55a9d
Merge branch 'apache-master' into metadata-only
lianhuiwang Jun 24, 2016
b2b6eba
update
lianhuiwang Jun 24, 2016
c5a291e
Merge branch 'apache-master' into metadata-only
lianhuiwang Jun 24, 2016
6404c1f
update opt for core
lianhuiwang Jun 24, 2016
1bb5812
refactor
lianhuiwang Jun 24, 2016
7e3729e
add ut
lianhuiwang Jun 24, 2016
fbf5d61
fix ut
lianhuiwang Jun 24, 2016
3411fd6
fix project
lianhuiwang Jun 26, 2016
aefab7f
address comments
lianhuiwang Jun 27, 2016
c5ccdea
fix cube/rollup
lianhuiwang Jun 27, 2016
ae6cf9f
fix style
lianhuiwang Jun 27, 2016
159331b
refactor
lianhuiwang Jun 27, 2016
3a1438b
refactor
lianhuiwang Jun 28, 2016
c0a7d59
update
lianhuiwang Jun 29, 2016
a4045ca
add comments
lianhuiwang Jun 29, 2016
0a023e7
fix minor
lianhuiwang Jun 29, 2016
a9b38ab
rename
lianhuiwang Jun 29, 2016
a5ea995
update
lianhuiwang Jun 29, 2016
1bed08d
fix monir
lianhuiwang Jun 29, 2016
a22e962
refactor
lianhuiwang Jul 1, 2016
41fef2c
update
lianhuiwang Jul 1, 2016
bd53678
Merge branch 'apache-master' into metadata-only
lianhuiwang Jul 1, 2016
88f7308
update
lianhuiwang Jul 1, 2016
2568193
add ut
lianhuiwang Jul 1, 2016
26a97f4
address comments
lianhuiwang Jul 4, 2016
4297f9f
update name
lianhuiwang Jul 6, 2016
1a65aa7
address comments
lianhuiwang Jul 6, 2016
d5e0df4
update
lianhuiwang Jul 6, 2016
9d6dd76
update2
lianhuiwang Jul 6, 2016
9cb01d8
update
lianhuiwang Jul 6, 2016
3e2687d
doc improve
cloud-fan Jul 6, 2016
2b4faf3
update
cloud-fan Jul 7, 2016
88fd3bf
Merge pull request #2 from cloud-fan/metadata-only
lianhuiwang Jul 7, 2016
a894bb7
delete cases
lianhuiwang Jul 7, 2016
9546b40
Merge branch 'metadata-only' of https://github.com/lianhuiwang/spark …
lianhuiwang Jul 7, 2016
85b695b
update ut
lianhuiwang Jul 7, 2016
bcfe8e5
Merge branch 'master' of https://github.com/apache/spark into metadat…
lianhuiwang Jul 7, 2016
67211be
Merge branch 'master' of https://github.com/apache/spark into metadat…
lianhuiwang Jul 7, 2016
501f93b
address commetns
lianhuiwang Jul 11, 2016
8ee2a8c
refactor
lianhuiwang Jul 11, 2016
d888c85
fix minor
lianhuiwang Jul 11, 2016
ff16509
update
lianhuiwang Jul 12, 2016
358ad13
remove duplicate code
lianhuiwang Jul 12, 2016
030776a
fix minor
lianhuiwang Jul 12, 2016
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
update
  • Loading branch information
lianhuiwang committed Jun 29, 2016
commit c0a7d59b014f55b348aa5ed5b089ee87936d2413
Original file line number Diff line number Diff line change
Expand Up @@ -292,14 +292,15 @@ class Analyzer(

// Ensure all the expressions have been resolved.
case x: GroupingSets if x.expressions.forall(_.resolved) =>
val gid = AttributeReference(VirtualColumn.groupingIdName, IntegerType, false)()
val gid = AttributeReference(VirtualColumn.groupingIdName, IntegerType, false)(
isPartitionColumn = true)

// Expand works by setting grouping expressions to null as determined by the bitmasks. To
// prevent these null values from being used in an aggregate instead of the original value
// we need to create new aliases for all group by expressions that will only be used for
// the intended purpose.
val groupByAliases: Seq[Alias] = x.groupByExprs.map {
case e: NamedExpression => Alias(e, e.name)()
case e: NamedExpression => Alias(e, e.name)(isPartitionColumn = e.isPartitionColumn)
case other => Alias(other, other.toString)()
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,9 @@ trait NamedExpression extends Expression {
/** Returns true if the expression is generated by Catalyst */
def isGenerated: java.lang.Boolean = false

/** Returns true if the expression is a partition column */
def isPartitionColumn: java.lang.Boolean = false

/** Returns a copy of this expression with a new `exprId`. */
def newInstance(): NamedExpression

Expand Down Expand Up @@ -127,12 +130,14 @@ abstract class Attribute extends LeafExpression with NamedExpression with NullIn
* tableName and subQueryAlias are possible qualifiers.
* @param explicitMetadata Explicit metadata associated with this alias that overwrites child's.
* @param isGenerated A flag to indicate if this alias is generated by Catalyst
* @param isPartitionColumn A flag to indicate if this alias is a partition column
*/
case class Alias(child: Expression, name: String)(
val exprId: ExprId = NamedExpression.newExprId,
val qualifier: Option[String] = None,
val explicitMetadata: Option[Metadata] = None,
override val isGenerated: java.lang.Boolean = false)
override val isGenerated: java.lang.Boolean = false,
override val isPartitionColumn: java.lang.Boolean = false)
extends UnaryExpression with NamedExpression {

// Alias(Generator, xx) need to be transformed into Generate(generator, ...)
Expand All @@ -158,12 +163,13 @@ case class Alias(child: Expression, name: String)(

def newInstance(): NamedExpression =
Alias(child, name)(
qualifier = qualifier, explicitMetadata = explicitMetadata, isGenerated = isGenerated)
qualifier = qualifier, explicitMetadata = explicitMetadata, isGenerated = isGenerated,
isPartitionColumn = isPartitionColumn)

override def toAttribute: Attribute = {
if (resolved) {
AttributeReference(name, child.dataType, child.nullable, metadata)(
exprId, qualifier, isGenerated)
exprId, qualifier, isGenerated, isPartitionColumn)
} else {
UnresolvedAttribute(name)
}
Expand All @@ -172,7 +178,7 @@ case class Alias(child: Expression, name: String)(
override def toString: String = s"$child AS $name#${exprId.id}$typeSuffix"

override protected final def otherCopyArgs: Seq[AnyRef] = {
exprId :: qualifier :: explicitMetadata :: isGenerated :: Nil
exprId :: qualifier :: explicitMetadata :: isGenerated :: isPartitionColumn :: Nil
}

override def hashCode(): Int = {
Expand Down Expand Up @@ -206,6 +212,7 @@ case class Alias(child: Expression, name: String)(
* qualified way. Consider the examples tableName.name, subQueryAlias.name.
* tableName and subQueryAlias are possible qualifiers.
* @param isGenerated A flag to indicate if this reference is generated by Catalyst
* @param isPartitionColumn A flag to indicate if this reference is a partition column
*/
case class AttributeReference(
name: String,
Expand All @@ -214,7 +221,8 @@ case class AttributeReference(
override val metadata: Metadata = Metadata.empty)(
val exprId: ExprId = NamedExpression.newExprId,
val qualifier: Option[String] = None,
override val isGenerated: java.lang.Boolean = false)
override val isGenerated: java.lang.Boolean = false,
override val isPartitionColumn: java.lang.Boolean = false)
extends Attribute with Unevaluable {

/**
Expand Down Expand Up @@ -252,7 +260,7 @@ case class AttributeReference(

override def newInstance(): AttributeReference =
AttributeReference(name, dataType, nullable, metadata)(
qualifier = qualifier, isGenerated = isGenerated)
qualifier = qualifier, isGenerated = isGenerated, isPartitionColumn = isPartitionColumn)

/**
* Returns a copy of this [[AttributeReference]] with changed nullability.
Expand All @@ -261,15 +269,17 @@ case class AttributeReference(
if (nullable == newNullability) {
this
} else {
AttributeReference(name, dataType, newNullability, metadata)(exprId, qualifier, isGenerated)
AttributeReference(name, dataType, newNullability, metadata)(
exprId, qualifier, isGenerated, isPartitionColumn)
}
}

override def withName(newName: String): AttributeReference = {
if (name == newName) {
this
} else {
AttributeReference(newName, dataType, nullable, metadata)(exprId, qualifier, isGenerated)
AttributeReference(newName, dataType, nullable, metadata)(
exprId, qualifier, isGenerated, isPartitionColumn)
}
}

Expand All @@ -280,20 +290,27 @@ case class AttributeReference(
if (newQualifier == qualifier) {
this
} else {
AttributeReference(name, dataType, nullable, metadata)(exprId, newQualifier, isGenerated)
AttributeReference(name, dataType, nullable, metadata)(
exprId, newQualifier, isGenerated, isPartitionColumn)
}
}

def withExprId(newExprId: ExprId): AttributeReference = {
if (exprId == newExprId) {
this
} else {
AttributeReference(name, dataType, nullable, metadata)(newExprId, qualifier, isGenerated)
AttributeReference(name, dataType, nullable, metadata)(
newExprId, qualifier, isGenerated, isPartitionColumn)
}
}

def setPartitionColumn(isPartitionColumn: Boolean): AttributeReference = {
AttributeReference(name, dataType, nullable, metadata)(
exprId, qualifier, isGenerated, isPartitionColumn)
}

override protected final def otherCopyArgs: Seq[AnyRef] = {
exprId :: qualifier :: isGenerated :: Nil
exprId :: qualifier :: isGenerated :: isPartitionColumn :: Nil
}

override def toString: String = s"$name#${exprId.id}$typeSuffix"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,10 @@ import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRela

/**
* When scanning only partition columns, get results based on metadata without scanning files.
* It is used for distinct, distinct aggregations or distinct-like aggregations(example: Max/Min).
* First of all, scanning only partition columns are required, then the rule does the following
* things here:
* It's used for operators that only need distinct values. Currently only [[Aggregate]] operator
* which satisfy the following conditions are supported:
* 1. aggregate expression is partition columns,
* e.g. SELECT col FROM tbl GROUP BY col or SELECT col FROM tbl GROUP BY cube(col).
* e.g. SELECT col FROM tbl GROUP BY col, SELECT col FROM tbl GROUP BY cube(col).
* 2. aggregate function on partition columns with DISTINCT,
* e.g. SELECT count(DISTINCT col) FROM tbl GROUP BY col.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this example is wrong, we can not aggregate on grouping columns, it should be SELECT count(DISTINCT col1) FROM tbl GROUP BY col2

* 3. aggregate function on partition columns which have same result with DISTINCT keyword.
Expand All @@ -43,36 +42,40 @@ case class MetadataOnlyOptimizer(
catalog: SessionCatalog) extends Rule[LogicalPlan] {

private def canSupportMetadataOnly(a: Aggregate): Boolean = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

here is my thoughts about the optimizable cases:

First of all, only parition colums are required(which means we need to traverse down the plan tree and find table relation here)

  1. aggregate expression is partition columns, e.g. SELECT col FROM tbl GROUP BY col
  2. aggregate function on partition columns with DISTINCT, e.g. SELECT count(DISTINCT a) FROM tbl GROUP BY b
  3. aggregate function on partition columns which have same result with or without DISTINCT keyword, e.g. SELECT sum(a) FROM tbl GROUP BY b

val aggregateExpressions = a.aggregateExpressions.flatMap { expr =>
expr.collect {
case agg: AggregateExpression => agg
}
}.distinct
if (aggregateExpressions.isEmpty) {
// Support for aggregate that has no aggregateFunction when expressions are partition columns
// example: select partitionCol from table group by partitionCol.
// Moreover, multiple-distinct has been rewritted into it by RewriteDistinctAggregates.
true
if (!a.references.forall(_.isPartitionColumn)) {
// Support for scanning only partition columns
false
} else {
aggregateExpressions.forall { agg =>
if (agg.isDistinct) {
true
} else {
// If function can be evaluated on just the distinct values of a column, it can be used
// by metadata-only optimizer.
agg.aggregateFunction match {
case max: Max => true
case min: Min => true
case hyperLog: HyperLogLogPlusPlus => true
case _ => false
val aggregateExpressions = a.aggregateExpressions.flatMap { expr =>
expr.collect {
case agg: AggregateExpression => agg
}
}.distinct
if (aggregateExpressions.isEmpty) {
// Support for aggregate that has no aggregateFunction when expressions are partition
// columns. example: select partitionCol from table group by partitionCol.
// Moreover, multiple-distinct has been rewritted into it by RewriteDistinctAggregates.
true
} else {
aggregateExpressions.forall { agg =>
if (agg.isDistinct) {
true
} else {
// If function can be evaluated on just the distinct values of a column, it can be used
// by metadata-only optimizer.
agg.aggregateFunction match {
case max: Max => true
case min: Min => true
case hyperLog: HyperLogLogPlusPlus => true
case _ => false
}
}
}
}
}
}

private def convertLogicalToMetadataOnly(
project: LogicalPlan,
filter: Option[Expression],
logical: LogicalRelation,
files: HadoopFsRelation): LogicalPlan = {
Expand All @@ -81,80 +84,62 @@ case class MetadataOnlyOptimizer(
attributeMap.getOrElse(field.name, throw new AnalysisException(
s"Unable to resolve ${field.name} given [${logical.output.map(_.name).mkString(", ")}]"))
}
val projectSet = filter.map(project.references ++ _.references).getOrElse(project.references)
if (projectSet.subsetOf(AttributeSet(partitionColumns))) {
val selectedPartitions = files.location.listFiles(filter.map(Seq(_)).getOrElse(Seq.empty))
val valuesRdd = sparkSession.sparkContext.parallelize(selectedPartitions.map(_.values), 1)
val valuesPlan = LogicalRDD(partitionColumns, valuesRdd)(sparkSession)
valuesPlan
} else {
logical
}
val selectedPartitions = files.location.listFiles(filter.map(Seq(_)).getOrElse(Seq.empty))
val valuesRdd = sparkSession.sparkContext.parallelize(selectedPartitions.map(_.values), 1)
val valuesPlan = LogicalRDD(partitionColumns, valuesRdd)(sparkSession)
valuesPlan
}

private def convertCatalogToMetadataOnly(
project: LogicalPlan,
filter: Option[Expression],
relation: CatalogRelation): LogicalPlan = {
private def convertCatalogToMetadataOnly(relation: CatalogRelation): LogicalPlan = {
val attributeMap = relation.output.map(attr => (attr.name, attr)).toMap
val partitionColumns = relation.catalogTable.partitionColumnNames.map { column =>
attributeMap.getOrElse(column, throw new AnalysisException(
s"Unable to resolve ${column} given [${relation.output.map(_.name).mkString(", ")}]"))
}
val projectSet = filter.map(project.references ++ _.references).getOrElse(project.references)
if (projectSet.subsetOf(AttributeSet(partitionColumns))) {
val partitionColumnDataTypes = partitionColumns.map(_.dataType)
val partitionValues = catalog.listPartitions(relation.catalogTable.identifier)
.map { p =>
InternalRow.fromSeq(
partitionColumns.map(a => p.spec(a.name)).zip(partitionColumnDataTypes).map {
case (rawValue, dataType) => Cast(Literal(rawValue), dataType).eval(null)
})
}
val valuesRdd = sparkSession.sparkContext.parallelize(partitionValues, 1)
val valuesPlan = LogicalRDD(partitionColumns, valuesRdd)(sparkSession)
valuesPlan
} else {
relation
}
val partitionColumnDataTypes = partitionColumns.map(_.dataType)
val partitionValues = catalog.listPartitions(relation.catalogTable.identifier)
.map { p =>
InternalRow.fromSeq(
partitionColumns.map(a => p.spec(a.name)).zip(partitionColumnDataTypes).map {
case (rawValue, dataType) => Cast(Literal(rawValue), dataType).eval(null)
})
}
val valuesRdd = sparkSession.sparkContext.parallelize(partitionValues, 1)
val valuesPlan = LogicalRDD(partitionColumns, valuesRdd)(sparkSession)
valuesPlan
}

private def convertToMetadataOnly(plan: LogicalPlan): LogicalPlan = plan match {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to think about it more carefully, i.e. how can the partition information propagate up from table relation?
It's obvious that Filter can retain all partition information, but for others, it's not trivial to explain.

Since this PR definitely need more people to review, how about we only handle Filter for now and improve it later? Then it's easier for other people to review and get this PR in. Thanks!

case p @ Project(fields, child) =>
case p @ Project(fields, child) if p.references.forall(_.isPartitionColumn) =>
child match {
case f @ Filter(condition, l @ LogicalRelation(files: HadoopFsRelation, _, _))
if files.partitionSchema.nonEmpty =>
val plan = convertLogicalToMetadataOnly(p, Some(condition), l, files)
if files.partitionSchema.nonEmpty && f.references.forall(_.isPartitionColumn) =>
val plan = convertLogicalToMetadataOnly(Some(condition), l, files)
p.withNewChildren(f.withNewChildren(plan :: Nil) :: Nil)

case l @ LogicalRelation(files: HadoopFsRelation, _, _)
if files.partitionSchema.nonEmpty =>
val plan = convertLogicalToMetadataOnly(p, None, l, files)
val plan = convertLogicalToMetadataOnly(None, l, files)
p.withNewChildren(plan :: Nil)

case f @ Filter(condition, relation: CatalogRelation)
if relation.catalogTable.partitionColumnNames.nonEmpty =>
val plan = convertCatalogToMetadataOnly(p, Some(condition), relation)
if relation.catalogTable.partitionColumnNames.nonEmpty &&
f.references.forall(_.isPartitionColumn) =>
val plan = convertCatalogToMetadataOnly(relation)
p.withNewChildren(f.withNewChildren(plan :: Nil) :: Nil)

case relation: CatalogRelation
if relation.catalogTable.partitionColumnNames.nonEmpty =>
val plan = convertCatalogToMetadataOnly(p, None, relation)
val plan = convertCatalogToMetadataOnly(relation)
p.withNewChildren(plan :: Nil)

case other =>
p.withNewChildren(p.children.map(convertToMetadataOnly(_)))
other
}

case f : Filter =>
f.withNewChildren(f.children.map(convertToMetadataOnly(_)))

case e: Expand =>
e.withNewChildren(e.children.map(convertToMetadataOnly(_)))

case u: Union =>
u.withNewChildren(u.children.map(convertToMetadataOnly(_)))

case other: LogicalPlan =>
other
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,10 @@ case class LogicalRelation(
extends LeafNode with MultiInstanceRelation {

override val output: Seq[AttributeReference] = {
val attrs = relation.schema.toAttributes
val attrs = relation.schema.toAttributes.map { attr =>
attr.setPartitionColumn(
relation.partitionColumnNames.contains(attr.name.toLowerCase))
}
expectedOutputAttributes.map { expectedAttrs =>
assert(expectedAttrs.length == attrs.length)
attrs.zip(expectedAttrs).map {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,8 @@ case class HadoopFsRelation(
})
}

override def partitionColumnNames: Set[String] = partitionSchema.map(_.name.toLowerCase).toSet

def partitionSchemaOption: Option[StructType] =
if (partitionSchema.isEmpty) None else Some(partitionSchema)
def partitionSpec: PartitionSpec = location.partitionSpec()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,7 @@ trait CreatableRelationProvider {
abstract class BaseRelation {
def sqlContext: SQLContext
def schema: StructType
def partitionColumnNames: Set[String] = Set.empty[String]

/**
* Returns an estimated size of this relation in bytes. This information is used by the planner
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -203,22 +203,22 @@ private[hive] case class MetastoreRelation(
)

implicit class SchemaAttribute(f: CatalogColumn) {
def toAttribute: AttributeReference = AttributeReference(
def toAttribute(isPartitionColumn: Boolean): AttributeReference = AttributeReference(
f.name,
CatalystSqlParser.parseDataType(f.dataType),
// Since data can be dumped in randomly with no validation, everything is nullable.
nullable = true
)(qualifier = Some(alias.getOrElse(tableName)))
)(qualifier = Some(alias.getOrElse(tableName)), isPartitionColumn = isPartitionColumn)
}

/** PartitionKey attributes */
val partitionKeys = catalogTable.partitionColumns.map(_.toAttribute)
val partitionKeys = catalogTable.partitionColumns.map(_.toAttribute(true))

/** Non-partitionKey attributes */
// TODO: just make this hold the schema itself, not just non-partition columns
val attributes = catalogTable.schema
.filter { c => !catalogTable.partitionColumnNames.contains(c.name) }
.map(_.toAttribute)
.map(_.toAttribute(false))

val output = attributes ++ partitionKeys

Expand Down
Loading